mirror of
https://github.com/kennethreitz/pytheory.git
synced 2026-06-05 23:00:20 +00:00
Compare commits
44 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 665a6f5de5 | |||
| 63362df697 | |||
| 755b33a63b | |||
| 40901d603d | |||
| 9b3cbd9065 | |||
| 0911947971 | |||
| c2f748d5f3 | |||
| 7a6942c8e4 | |||
| db7fabf985 | |||
| a07b7e7cea | |||
| 7245cd0e51 | |||
| 9e85a48d0e | |||
| 95b7bd830c | |||
| 150c57ed3d | |||
| d35d2b12f3 | |||
| 2084473788 | |||
| 970c730012 | |||
| 5f94e1939b | |||
| b649b2e659 | |||
| ed6ba2ab9f | |||
| fd317f9cfd | |||
| c57e29fe28 | |||
| 938024bfa2 | |||
| acc92f9a60 | |||
| 0d340dad30 | |||
| 1762500108 | |||
| ac2801d07d | |||
| c49ec27b1b | |||
| 5f4070c4a7 | |||
| 8735393aaa | |||
| 12f15d5138 | |||
| 20fc5e40b8 | |||
| 91d16595b7 | |||
| 54659d39b1 | |||
| 7cb2c166f9 | |||
| ba2038d7ff | |||
| 198fded20e | |||
| 51159e309a | |||
| 54df949089 | |||
| 30c70da468 | |||
| c633bd6f61 | |||
| bc652c37d0 | |||
| 417d9a6908 | |||
| f7d8f08446 |
@@ -2,6 +2,53 @@
|
||||
|
||||
All notable changes to PyTheory are documented here.
|
||||
|
||||
## 0.40.6
|
||||
|
||||
- **Saxophone presets cleaned up** — removed lowpass filters and vel_to_filter
|
||||
from all sax instrument presets (saxophone, alto_sax, tenor_sax, bari_sax).
|
||||
The saxophone wave function already shapes its own spectrum; the extra
|
||||
filters were dulling the tone.
|
||||
|
||||
## 0.40.5
|
||||
|
||||
- **Saxophone synth overhaul** — reed nonlinearity (asymmetric soft clipping),
|
||||
conical bore formant resonances, breath noise with attack envelope, separate
|
||||
reed buzz, key click transient, and sub-harmonic warmth. Vibrato dialed back
|
||||
to subtle, delayed onset.
|
||||
|
||||
## 0.40.4
|
||||
|
||||
- **Distortion overhaul** — multi-stage clipping (preamp → power amp →
|
||||
asymmetric rectifier) replaces single-stage tanh. Crunch, distorted,
|
||||
orange crunch, and metal guitar presets now sound properly driven.
|
||||
|
||||
## 0.40.3
|
||||
|
||||
- **Crotales synth** — tuned bronze discs with long ring and bright harmonics
|
||||
- **Tingsha synth** — paired Tibetan cymbals with beating from two detuned discs
|
||||
- **Rain stick** — cascading pebbles (steep and slow/shallow variants)
|
||||
- **Ocean drum** — steel beads rolling inside a frame drum, surf wash
|
||||
- **Cabasa** — metal bead chain on cylinder, bright metallic scrape
|
||||
- **Wind chimes** — multiple suspended metal tubes ringing at random offsets
|
||||
- **Finger cymbal** — single zill tap, bright metallic ping
|
||||
- `crotales`, `tingsha`, `singing_bowl`, `singing_bowl_ring` instrument presets
|
||||
- Audio demos in docs for all new sounds
|
||||
|
||||
## 0.40.2
|
||||
|
||||
- **Master compressor dialed back** — threshold raised from 0.5 to 0.7,
|
||||
makeup gain capped at 3x. Sparse arrangements no longer get
|
||||
over-amplified to clipping.
|
||||
|
||||
## 0.40.1
|
||||
|
||||
- **Singing bowl synth** — two variants: strike (mallet hit with chirp
|
||||
and long decay) and ring (rim-rubbed sustained tone with slow build).
|
||||
Inharmonic partials beat against near-degenerate mode pairs for
|
||||
authentic Himalayan bowl shimmer.
|
||||
- `singing_bowl` and `singing_bowl_ring` instrument presets
|
||||
- Audio demos in docs for both variants
|
||||
|
||||
## 0.40.0
|
||||
|
||||
- **Rhodes electric piano synth** — tine + tonebar + electromagnetic
|
||||
|
||||
Vendored
BIN
Binary file not shown.
Vendored
BIN
Binary file not shown.
Vendored
BIN
Binary file not shown.
Vendored
BIN
Binary file not shown.
BIN
Binary file not shown.
Vendored
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Binary file not shown.
Vendored
BIN
Binary file not shown.
BIN
Binary file not shown.
BIN
Binary file not shown.
Vendored
BIN
Binary file not shown.
@@ -662,6 +662,41 @@ def gen_synth_kalimba():
|
||||
p.add(n, Duration.QUARTER, velocity=85)
|
||||
render("synth_kalimba", score)
|
||||
|
||||
def gen_synth_wurlitzer():
|
||||
score = Score("4/4", bpm=90)
|
||||
p = score.part("demo", instrument="wurlitzer", volume=0.5, reverb=0.25)
|
||||
p.hold("C3", Duration.WHOLE * 2, velocity=60)
|
||||
p.hold("Eb3", Duration.WHOLE * 2, velocity=55)
|
||||
p.hold("G3", Duration.WHOLE * 2, velocity=55)
|
||||
for n in ["G4", "Bb4", "C5", "Bb4", "G4", "F4", "Eb4", "G4"]:
|
||||
p.add(n, Duration.QUARTER, velocity=78)
|
||||
render("synth_wurlitzer", score)
|
||||
|
||||
def gen_synth_vibraphone():
|
||||
score = Score("4/4", bpm=90)
|
||||
p = score.part("demo", instrument="vibraphone", volume=0.5)
|
||||
for n in ["C5", "E5", "G5", "C6", "G5", "E5", "C5", "E5"]:
|
||||
p.add(n, Duration.QUARTER, velocity=75)
|
||||
render("synth_vibraphone", score)
|
||||
|
||||
def gen_synth_pipe_organ():
|
||||
score = Score("4/4", bpm=70)
|
||||
p = score.part("demo", instrument="pipe_organ", volume=0.5)
|
||||
p.hold("C3", Duration.WHOLE * 4, velocity=70)
|
||||
p.hold("G3", Duration.WHOLE * 4, velocity=65)
|
||||
for n in ["C4", "D4", "E4", "F4", "G4", "F4", "E4", "D4",
|
||||
"C4", "E4", "G4", "C5", "G4", "E4", "C4", "C4"]:
|
||||
p.add(n, Duration.QUARTER, velocity=75)
|
||||
render("synth_pipe_organ", score)
|
||||
|
||||
def gen_synth_choir():
|
||||
score = Score("4/4", bpm=70)
|
||||
p = score.part("demo", instrument="choir", volume=0.5)
|
||||
for n, v in [("C4", "ah"), ("E4", "oh"), ("G4", "ah"), ("C5", "ee"),
|
||||
("G4", "oh"), ("E4", "ah"), ("C4", "oo"), ("C4", "ah")]:
|
||||
p.add(n, Duration.HALF, velocity=70, lyric=v)
|
||||
render("synth_choir", score)
|
||||
|
||||
def gen_synth_organ():
|
||||
_synth_demo("organ", "organ_synth", envelope="organ")
|
||||
|
||||
@@ -823,6 +858,86 @@ def gen_synth_granular():
|
||||
render("synth_granular", score)
|
||||
|
||||
|
||||
def gen_synth_crotales():
|
||||
score = Score("4/4", bpm=60)
|
||||
p = score.part("demo", synth="crotales_synth", envelope="none",
|
||||
volume=0.5, reverb=0.3)
|
||||
for n in ["C6", "E6", "G6", "C7", "G6", "E6", "C6"]:
|
||||
p.add(n, Duration.HALF, velocity=80)
|
||||
render("synth_crotales", score)
|
||||
|
||||
|
||||
def gen_synth_tingsha():
|
||||
score = Score("4/4", bpm=40)
|
||||
p = score.part("demo", synth="tingsha_synth", envelope="none",
|
||||
volume=0.5, reverb=0.4)
|
||||
for n in ["E5", "A5", "E6", "A5"]:
|
||||
p.add(n, Duration.WHOLE, velocity=75)
|
||||
render("synth_tingsha", score)
|
||||
|
||||
|
||||
def gen_rainstick():
|
||||
score = Score("4/4", bpm=60)
|
||||
p = score.part("demo", synth="sine", volume=1.0)
|
||||
p.hit(DrumSound.RAINSTICK, Duration.WHOLE * 3, velocity=90)
|
||||
render("rainstick", score)
|
||||
|
||||
|
||||
def gen_rainstick_slow():
|
||||
score = Score("4/4", bpm=60)
|
||||
p = score.part("demo", synth="sine", volume=1.0)
|
||||
p.hit(DrumSound.RAINSTICK_SLOW, Duration.WHOLE * 4, velocity=85)
|
||||
render("rainstick_slow", score)
|
||||
|
||||
|
||||
def gen_ocean_drum():
|
||||
score = Score("4/4", bpm=60)
|
||||
p = score.part("demo", synth="sine", volume=1.0)
|
||||
p.hit(DrumSound.OCEAN_DRUM, Duration.WHOLE * 3, velocity=85)
|
||||
render("ocean_drum", score)
|
||||
|
||||
|
||||
def gen_cabasa():
|
||||
score = Score("4/4", bpm=100)
|
||||
p = score.part("demo", synth="sine", volume=1.0)
|
||||
for _ in range(16):
|
||||
p.hit(DrumSound.CABASA, Duration.EIGHTH, velocity=100)
|
||||
render("cabasa", score)
|
||||
|
||||
|
||||
def gen_wind_chimes():
|
||||
score = Score("4/4", bpm=60)
|
||||
p = score.part("demo", synth="sine", volume=1.0)
|
||||
p.hit(DrumSound.WIND_CHIMES, Duration.WHOLE * 3, velocity=85)
|
||||
render("wind_chimes", score)
|
||||
|
||||
|
||||
def gen_finger_cymbal():
|
||||
score = Score("4/4", bpm=80)
|
||||
p = score.part("demo", synth="sine", volume=1.0)
|
||||
for _ in range(8):
|
||||
p.hit(DrumSound.FINGER_CYMBAL, Duration.QUARTER, velocity=85)
|
||||
render("finger_cymbal", score)
|
||||
|
||||
|
||||
def gen_synth_singing_bowl_strike():
|
||||
score = Score("4/4", bpm=40)
|
||||
p = score.part("demo", synth="singing_bowl_strike_synth", envelope="none",
|
||||
volume=0.5, reverb=0.4)
|
||||
for n in ["A3", "D4", "F4", "A4"]:
|
||||
p.add(n, Duration.WHOLE, velocity=80)
|
||||
render("synth_singing_bowl_strike", score)
|
||||
|
||||
|
||||
def gen_synth_singing_bowl_ring():
|
||||
score = Score("4/4", bpm=30)
|
||||
p = score.part("demo", synth="singing_bowl_ring_synth", envelope="none",
|
||||
volume=0.5, reverb=0.4)
|
||||
for n in ["A3", "D4", "A4"]:
|
||||
p.add(n, Duration.WHOLE * 2, velocity=75)
|
||||
render("synth_singing_bowl_ring", score)
|
||||
|
||||
|
||||
def gen_arpeggio():
|
||||
score = Score("4/4", bpm=132)
|
||||
score.drums("house", repeats=8)
|
||||
@@ -1004,6 +1119,10 @@ GENERATORS = [
|
||||
gen_synth_electric_guitar,
|
||||
gen_synth_sitar,
|
||||
gen_synth_kalimba,
|
||||
gen_synth_wurlitzer,
|
||||
gen_synth_vibraphone,
|
||||
gen_synth_pipe_organ,
|
||||
gen_synth_choir,
|
||||
gen_synth_organ,
|
||||
gen_synth_marimba,
|
||||
gen_synth_harp,
|
||||
@@ -1021,6 +1140,16 @@ GENERATORS = [
|
||||
gen_synth_mandolin,
|
||||
gen_synth_ukulele,
|
||||
gen_synth_granular,
|
||||
gen_synth_crotales,
|
||||
gen_synth_tingsha,
|
||||
gen_synth_singing_bowl_strike,
|
||||
gen_synth_singing_bowl_ring,
|
||||
gen_rainstick,
|
||||
gen_rainstick_slow,
|
||||
gen_ocean_drum,
|
||||
gen_cabasa,
|
||||
gen_wind_chimes,
|
||||
gen_finger_cymbal,
|
||||
gen_arpeggio,
|
||||
gen_legato_glide,
|
||||
gen_acid_house,
|
||||
|
||||
+205
-2
@@ -479,6 +479,72 @@ singing sustain. The sound of jazz clubs, soul, and neo-soul.
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_rhodes.wav" type="audio/wav"></audio>
|
||||
|
||||
Wurlitzer Electric Piano
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The Wurlitzer uses a vibrating steel reed (not a tine like Rhodes)
|
||||
picked up by an electrostatic pickup. More nasal, reedy, and biting
|
||||
— it barks and growls when played hard. Think Supertramp, Ray Charles.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
wurli = score.part("wurli", synth="wurlitzer_synth")
|
||||
wurli = score.part("wurli", instrument="wurlitzer")
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_wurlitzer.wav" type="audio/wav"></audio>
|
||||
|
||||
Vibraphone Synth
|
||||
~~~~~~~~~~~~~~~~
|
||||
|
||||
Struck aluminum bars with motor-driven tremolo discs. The spinning
|
||||
motor modulates the sound through the resonator tubes, creating the
|
||||
signature vibraphone shimmer. Inharmonic bar modes at 1x, 2.76x, 5.4x.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
vib = score.part("vib", synth="vibraphone_synth")
|
||||
vib = score.part("vib", instrument="vibraphone")
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_vibraphone.wav" type="audio/wav"></audio>
|
||||
|
||||
Pipe Organ Synth
|
||||
~~~~~~~~~~~~~~~~
|
||||
|
||||
Multiple ranks of pipes — principal 8', octave 4', fifteenth 2'.
|
||||
Constant air pressure means no dynamics. Wind chiff at the attack.
|
||||
Best with cathedral reverb.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
organ = score.part("organ", synth="pipe_organ_synth")
|
||||
organ = score.part("organ", instrument="pipe_organ")
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_pipe_organ.wav" type="audio/wav"></audio>
|
||||
|
||||
Choir Synth
|
||||
~~~~~~~~~~~
|
||||
|
||||
Voices singing vowels shaped by formant bandpass filters. The glottal
|
||||
source is filtered through vocal tract resonances — F1, F2, F3, F4 —
|
||||
which is what makes "ah" sound different from "oo". Use ``lyric=``
|
||||
to control the vowel. Best with ``ensemble=`` for a full section.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
choir = score.part("choir", synth="choir_synth")
|
||||
choir = score.part("choir", instrument="choir") # ensemble=6 + cathedral reverb
|
||||
choir.add("C4", Duration.WHOLE, lyric="ah")
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_choir.wav" type="audio/wav"></audio>
|
||||
|
||||
Bass Guitar Synth
|
||||
~~~~~~~~~~~~~~~~~
|
||||
|
||||
@@ -864,6 +930,143 @@ Parameters (passed as synth kwargs):
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_granular.wav" type="audio/wav"></audio>
|
||||
|
||||
Crotales
|
||||
~~~~~~~~
|
||||
|
||||
Small tuned bronze discs (antique cymbals) struck with brass mallets.
|
||||
Bright, crystalline, bell-like tone with strong upper harmonics that
|
||||
rings for a long time. Nearly harmonic partials give crotales their
|
||||
penetrating brilliance — they cut through any orchestra.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
crotales = score.part("crotales", synth="crotales_synth", envelope="none",
|
||||
reverb=0.3)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_crotales.wav" type="audio/wav"></audio>
|
||||
|
||||
Tingsha
|
||||
~~~~~~~
|
||||
|
||||
Two small Tibetan cymbals joined by a cord, clashed together. Both discs
|
||||
ring at slightly different frequencies, producing a bright ping with
|
||||
pronounced beating — the wavering interference between the two is the
|
||||
whole character of the sound.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
tingsha = score.part("tingsha", synth="tingsha_synth", envelope="none",
|
||||
reverb=0.4)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_tingsha.wav" type="audio/wav"></audio>
|
||||
|
||||
Singing Bowl (Strike)
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Tibetan/Himalayan singing bowl struck with a mallet. The impact excites
|
||||
inharmonic partials that ring and slowly beat against each other as
|
||||
near-degenerate mode pairs interfere. Higher modes fade quickly, leaving
|
||||
the fundamental shimmering for seconds.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
bowl = score.part("bowl", synth="singing_bowl_strike_synth", envelope="none",
|
||||
reverb=0.4)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_singing_bowl_strike.wav" type="audio/wav"></audio>
|
||||
|
||||
Singing Bowl (Ring)
|
||||
~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
Rim-rubbed singing bowl — the mallet traces the rim, slowly building the
|
||||
fundamental into a sustained, pulsing tone. Upper harmonics shimmer in
|
||||
and out as the bowl resonates.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
bowl = score.part("bowl", synth="singing_bowl_ring_synth", envelope="none",
|
||||
reverb=0.4)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/synth_singing_bowl_ring.wav" type="audio/wav"></audio>
|
||||
|
||||
Rain Stick
|
||||
~~~~~~~~~~
|
||||
|
||||
Cascading pebbles through a cactus tube with internal pins. Two variants:
|
||||
steep angle (fast cascade) and shallow angle (slow trickle).
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
p.hit(DrumSound.RAINSTICK, Duration.WHOLE * 3) # steep — fast cascade
|
||||
p.hit(DrumSound.RAINSTICK_SLOW, Duration.WHOLE * 4) # shallow — gentle trickle
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/rainstick.wav" type="audio/wav"></audio>
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/rainstick_slow.wav" type="audio/wav"></audio>
|
||||
|
||||
Ocean Drum
|
||||
~~~~~~~~~~
|
||||
|
||||
Steel beads rolling inside a frame drum — tilting produces a smooth surf wash.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
p.hit(DrumSound.OCEAN_DRUM, Duration.WHOLE * 3)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/ocean_drum.wav" type="audio/wav"></audio>
|
||||
|
||||
Cabasa
|
||||
~~~~~~
|
||||
|
||||
Metal bead chain scraped against a textured cylinder — brighter and
|
||||
more metallic than a shaker.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
p.hit(DrumSound.CABASA, Duration.EIGHTH)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/cabasa.wav" type="audio/wav"></audio>
|
||||
|
||||
Wind Chimes
|
||||
~~~~~~~~~~~
|
||||
|
||||
Suspended metal tubes struck by hand or breeze. Each tube rings at
|
||||
its own pitch with slight time offsets.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
p.hit(DrumSound.WIND_CHIMES, Duration.WHOLE * 3)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/wind_chimes.wav" type="audio/wav"></audio>
|
||||
|
||||
Finger Cymbal
|
||||
~~~~~~~~~~~~~
|
||||
|
||||
Single small cymbal tap (zill) — bright metallic ping.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
p.hit(DrumSound.FINGER_CYMBAL, Duration.HALF)
|
||||
|
||||
.. raw:: html
|
||||
|
||||
<audio controls style="width:100%;margin:0.3em 0 0.5em"><source src="../_static/audio/finger_cymbal.wav" type="audio/wav"></audio>
|
||||
|
||||
Analog Oscillator Drift
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
@@ -919,13 +1122,13 @@ distorted_guitar, orange_crunch, metal_guitar, bass_guitar, upright_bass,
|
||||
harp, sitar, koto, banjo, mandolin, mandola, ukulele
|
||||
|
||||
**World/Exotic**: pedal_steel, theremin, kalimba, steel_drum, didgeridoo,
|
||||
bagpipe
|
||||
bagpipe, singing_bowl, singing_bowl_ring, tingsha
|
||||
|
||||
**Synth**: synth_lead, synth_pad, synth_bass, acid_bass, 808_bass,
|
||||
granular_pad, granular_texture, vocal, choir
|
||||
|
||||
**Percussion**: vibraphone, marimba, xylophone, glockenspiel, tubular_bells,
|
||||
timpani
|
||||
timpani, crotales
|
||||
|
||||
Explicit kwargs override preset defaults:
|
||||
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
"""Play a recorded MIDI file through pytheory's full renderer.
|
||||
|
||||
Takes a MIDI file captured by the live engine and plays it back
|
||||
through the complete synthesis pipeline — with ensemble, effects,
|
||||
reverb, and master compression.
|
||||
|
||||
Usage:
|
||||
python play_recording.py recording.mid
|
||||
python play_recording.py recording.mid --bpm 110
|
||||
"""
|
||||
|
||||
import sys
|
||||
import sounddevice as sd
|
||||
|
||||
from pytheory import Score
|
||||
from pytheory.play import render_score, SAMPLE_RATE
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print(" Usage: python play_recording.py <file.mid> [--bpm N]")
|
||||
return
|
||||
|
||||
filename = sys.argv[1]
|
||||
bpm = None
|
||||
if "--bpm" in sys.argv:
|
||||
idx = sys.argv.index("--bpm")
|
||||
if idx + 1 < len(sys.argv):
|
||||
bpm = int(sys.argv[idx + 1])
|
||||
|
||||
print(f" Loading {filename}...")
|
||||
score = Score.from_midi(filename)
|
||||
|
||||
if bpm:
|
||||
score.bpm = bpm
|
||||
|
||||
print(f" {score}")
|
||||
print(f" Rendering...")
|
||||
|
||||
buf = render_score(score)
|
||||
duration = len(buf) / SAMPLE_RATE
|
||||
|
||||
print(f" Playing ({duration:.1f}s)...")
|
||||
try:
|
||||
sd.play(buf, SAMPLE_RATE)
|
||||
sd.wait()
|
||||
except KeyboardInterrupt:
|
||||
sd.stop()
|
||||
print("\n Stopped.")
|
||||
|
||||
print(" Done.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+2
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "pytheory"
|
||||
version = "0.40.0"
|
||||
version = "0.40.6"
|
||||
description = "Music Theory for Humans"
|
||||
readme = "README.md"
|
||||
license = "MIT"
|
||||
@@ -23,6 +23,7 @@ classifiers = [
|
||||
dependencies = [
|
||||
"sounddevice",
|
||||
"scipy",
|
||||
"rich>=14.3.3",
|
||||
]
|
||||
|
||||
[project.urls]
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
"""PyTheory: Music Theory for Humans."""
|
||||
|
||||
__version__ = "0.40.0"
|
||||
__version__ = "0.40.6"
|
||||
|
||||
from .tones import Tone, Interval
|
||||
from .systems import System, SYSTEMS, TET
|
||||
|
||||
@@ -0,0 +1,826 @@
|
||||
"""Real-time MIDI-driven synthesis engine.
|
||||
|
||||
Listens for MIDI input (e.g. from an OP-XY, keyboard, or DAW) and
|
||||
synthesizes audio in real-time through pytheory's synth engine.
|
||||
|
||||
Usage::
|
||||
|
||||
from pytheory.live import LiveEngine
|
||||
|
||||
engine = LiveEngine()
|
||||
engine.channel(1, instrument="electric_piano")
|
||||
engine.channel(2, instrument="bass_guitar", lowpass=800)
|
||||
engine.channel(10, drums=True)
|
||||
engine.start() # blocks until Ctrl-C
|
||||
|
||||
Note: sustained notes are pre-rendered to a 3-second wavetable.
|
||||
Instruments requiring longer sustain (pads, organ) will cut off
|
||||
after 3 seconds. This is a known limitation of the current
|
||||
wavetable approach.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import numpy
|
||||
import sounddevice as sd
|
||||
|
||||
try:
|
||||
import rtmidi
|
||||
except ImportError:
|
||||
rtmidi = None
|
||||
|
||||
from .play import (
|
||||
_SYNTH_FUNCTIONS, _resolve_synth, _resolve_envelope,
|
||||
_apply_envelope, _apply_lowpass, _render_drum_hit_cached,
|
||||
SAMPLE_RATE, SAMPLE_PEAK,
|
||||
)
|
||||
from .rhythm import INSTRUMENTS, DrumSound
|
||||
|
||||
|
||||
# ── Voice ────────────────────────────────────────────────────────────────
|
||||
|
||||
class _Voice:
|
||||
"""A single sounding note - holds a pre-rendered wavetable and
|
||||
tracks playback position + envelope state."""
|
||||
__slots__ = ('active', 'note', 'pitch_ratio', 'pos', 'release_len',
|
||||
'release_pos', 'releasing', 'velocity', 'wave')
|
||||
|
||||
def __init__(self, wave, velocity, note):
|
||||
self.wave = wave # float32 array - one shot
|
||||
self.pos = 0.0 # current read position (float for pitch bend)
|
||||
self.velocity = velocity
|
||||
self.active = True
|
||||
self.releasing = False
|
||||
self.release_pos = 0
|
||||
self.release_len = int(SAMPLE_RATE * 0.05) # 50ms release
|
||||
self.note = note # MIDI note number
|
||||
self.pitch_ratio = 1.0 # 1.0 = normal, >1 = up, <1 = down
|
||||
|
||||
|
||||
# ── Channel ──────────────────────────────────────────────────────────────
|
||||
|
||||
class _Channel:
|
||||
"""One MIDI channel - has a synth, effects, and a voice pool."""
|
||||
|
||||
def __init__(self, synth_name="sine", envelope_name="piano",
|
||||
is_drums=False, max_voices=12, **kwargs):
|
||||
self.synth_fn = _resolve_synth(synth_name)
|
||||
self.synth_name = synth_name
|
||||
self.envelope_name = envelope_name
|
||||
self.env_tuple = _resolve_envelope(envelope_name)
|
||||
self.is_drums = is_drums
|
||||
self.max_voices = max_voices
|
||||
self.kwargs = kwargs
|
||||
self.lowpass = kwargs.get('lowpass', 0)
|
||||
self.lowpass_q = kwargs.get('lowpass_q', 0.707)
|
||||
self.reverb = kwargs.get('reverb', 0)
|
||||
self.volume = kwargs.get('volume', 0.5)
|
||||
self.pan = kwargs.get('pan', 0.0) # -1 left, 0 center, 1 right
|
||||
self.chorus = kwargs.get('chorus', 0)
|
||||
self.detune = kwargs.get('detune', 0)
|
||||
self.spread = kwargs.get('spread', 0)
|
||||
self.analog = kwargs.get('analog', 0)
|
||||
self.distortion = kwargs.get('distortion', 0)
|
||||
self.delay = kwargs.get('delay', 0)
|
||||
self.tremolo_depth = kwargs.get('tremolo_depth', 0)
|
||||
self.saturation = kwargs.get('saturation', 0)
|
||||
self.phaser = kwargs.get('phaser', 0)
|
||||
self.sub_osc = kwargs.get('sub_osc', 0)
|
||||
self.noise_mix = kwargs.get('noise_mix', 0)
|
||||
|
||||
self.voices = [] # active _Voice objects
|
||||
self._cache = {} # MIDI note -> pre-rendered wave
|
||||
self._lock = threading.Lock()
|
||||
self.level = 0.0 # current output level (for VU meter)
|
||||
|
||||
def _get_wave(self, midi_note, n_samples):
|
||||
"""Get or render a waveform for a MIDI note."""
|
||||
if self.is_drums:
|
||||
return _render_drum_hit_cached(midi_note, n_samples)
|
||||
|
||||
if midi_note in self._cache:
|
||||
cached = self._cache[midi_note]
|
||||
if len(cached) >= n_samples:
|
||||
return cached[:n_samples]
|
||||
|
||||
hz = 440.0 * (2 ** ((midi_note - 69) / 12.0))
|
||||
|
||||
# Synth kwargs
|
||||
skw = {}
|
||||
if self.synth_name in ("fm",):
|
||||
skw["mod_ratio"] = self.kwargs.get("fm_ratio", 2.0)
|
||||
skw["mod_index"] = self.kwargs.get("fm_index", 3.0)
|
||||
|
||||
wave = self.synth_fn(hz, n_samples=n_samples, **skw)
|
||||
wave_f = wave.astype(numpy.float32) / SAMPLE_PEAK
|
||||
|
||||
# Apply envelope
|
||||
a, d, s, r = self.env_tuple
|
||||
if a > 0 or d > 0 or s < 1.0 or r > 0:
|
||||
wave_f = _apply_envelope(wave_f, a, d, s, r)
|
||||
|
||||
# Apply lowpass
|
||||
if self.lowpass > 0:
|
||||
wave_f = _apply_lowpass(wave_f, self.lowpass, q=self.lowpass_q)
|
||||
|
||||
# Apply reverb - simple feedback delay for real-time
|
||||
if self.reverb > 0:
|
||||
wet = self.reverb
|
||||
delay_samples = int(SAMPLE_RATE * 0.03) # 30ms early reflection
|
||||
delay2 = int(SAMPLE_RATE * 0.047) # second tap
|
||||
delay3 = int(SAMPLE_RATE * 0.071) # third tap
|
||||
reverbed = wave_f.copy()
|
||||
for delay, gain in [(delay_samples, 0.4), (delay2, 0.3), (delay3, 0.2)]:
|
||||
if delay < len(reverbed):
|
||||
reverbed[delay:] += wave_f[:-delay] * gain
|
||||
# Feedback loop for tail
|
||||
fb_delay = int(SAMPLE_RATE * 0.05)
|
||||
feedback = 0.35
|
||||
for _ in range(6):
|
||||
if fb_delay < len(reverbed):
|
||||
reverbed[fb_delay:] += reverbed[:-fb_delay] * feedback
|
||||
feedback *= 0.7
|
||||
fb_delay = int(fb_delay * 1.5)
|
||||
wave_f = wave_f * (1.0 - wet) + reverbed * wet
|
||||
|
||||
# Apply distortion/saturation
|
||||
if self.distortion > 0:
|
||||
drive = 3.0
|
||||
wave_f = numpy.tanh(wave_f * drive * (1 + self.distortion * 3)) / drive
|
||||
if self.saturation > 0:
|
||||
wave_f = numpy.tanh(wave_f * (1 + self.saturation * 2))
|
||||
|
||||
# Apply tremolo
|
||||
if self.tremolo_depth > 0:
|
||||
t = numpy.arange(len(wave_f), dtype=numpy.float32) / SAMPLE_RATE
|
||||
trem = 1.0 - self.tremolo_depth * 0.5 * (1 + numpy.sin(2 * numpy.pi * 5.0 * t))
|
||||
wave_f *= trem
|
||||
|
||||
# Apply chorus (simple delay modulation)
|
||||
if self.chorus > 0:
|
||||
t = numpy.arange(len(wave_f), dtype=numpy.float32) / SAMPLE_RATE
|
||||
mod = (numpy.sin(2 * numpy.pi * 1.5 * t) * 0.002 * SAMPLE_RATE).astype(int)
|
||||
chorus_buf = numpy.zeros_like(wave_f)
|
||||
for i in range(len(wave_f)):
|
||||
idx = i - abs(mod[i]) - int(SAMPLE_RATE * 0.015)
|
||||
if 0 <= idx < len(wave_f):
|
||||
chorus_buf[i] = wave_f[idx]
|
||||
wave_f = wave_f * (1 - self.chorus * 0.5) + chorus_buf * self.chorus * 0.5
|
||||
|
||||
self._cache[midi_note] = wave_f
|
||||
return wave_f
|
||||
|
||||
def note_on(self, midi_note, velocity):
|
||||
"""Start a new voice."""
|
||||
vel_scale = velocity / 127.0
|
||||
# Render 3 seconds of audio (extra for reverb tail)
|
||||
n_samples = SAMPLE_RATE * 3
|
||||
wave = self._get_wave(midi_note, n_samples)
|
||||
|
||||
with self._lock:
|
||||
# Voice stealing - kill oldest if at max
|
||||
if len(self.voices) >= self.max_voices:
|
||||
self.voices.pop(0)
|
||||
self.voices.append(_Voice(wave, vel_scale, midi_note))
|
||||
|
||||
def note_off(self, midi_note):
|
||||
"""Trigger release on voices playing this note."""
|
||||
with self._lock:
|
||||
for v in self.voices:
|
||||
if v.note == midi_note and v.active and not v.releasing:
|
||||
v.releasing = True
|
||||
v.release_pos = 0
|
||||
|
||||
def render_stereo(self, n_frames):
|
||||
"""Mix all active voices into a stereo buffer (n_frames, 2)."""
|
||||
mono = numpy.zeros(n_frames, dtype=numpy.float32)
|
||||
dead = []
|
||||
|
||||
with self._lock:
|
||||
for i, v in enumerate(self.voices):
|
||||
if not v.active:
|
||||
dead.append(i)
|
||||
continue
|
||||
|
||||
remaining = len(v.wave) - int(v.pos)
|
||||
chunk = min(n_frames, remaining)
|
||||
|
||||
if chunk <= 0:
|
||||
v.active = False
|
||||
dead.append(i)
|
||||
continue
|
||||
|
||||
# Pitch bend: variable-rate read
|
||||
if abs(v.pitch_ratio - 1.0) > 0.001:
|
||||
read_positions = v.pos + numpy.arange(chunk) * v.pitch_ratio
|
||||
read_positions = numpy.clip(read_positions, 0, len(v.wave) - 2)
|
||||
idx = read_positions.astype(numpy.int64)
|
||||
frac = (read_positions - idx).astype(numpy.float32)
|
||||
samples = (v.wave[idx] * (1 - frac) +
|
||||
v.wave[numpy.minimum(idx + 1, len(v.wave) - 1)] * frac)
|
||||
samples *= v.velocity * self.volume
|
||||
else:
|
||||
int_pos = int(v.pos)
|
||||
samples = v.wave[int_pos:int_pos + chunk] * v.velocity * self.volume
|
||||
|
||||
# Release crossfade
|
||||
if v.releasing:
|
||||
fade_chunk = min(chunk, v.release_len - v.release_pos)
|
||||
if fade_chunk > 0:
|
||||
fade = numpy.linspace(
|
||||
1.0 - v.release_pos / v.release_len,
|
||||
1.0 - (v.release_pos + fade_chunk) / v.release_len,
|
||||
fade_chunk
|
||||
).astype(numpy.float32)
|
||||
samples[:fade_chunk] *= fade
|
||||
v.release_pos += fade_chunk
|
||||
if v.release_pos >= v.release_len:
|
||||
v.active = False
|
||||
samples[fade_chunk:] = 0
|
||||
|
||||
mono[:chunk] += samples
|
||||
v.pos += chunk * v.pitch_ratio
|
||||
|
||||
# Clean up dead voices
|
||||
for i in reversed(dead):
|
||||
if i < len(self.voices):
|
||||
self.voices.pop(i)
|
||||
|
||||
# VU meter
|
||||
peak = numpy.abs(mono).max() if len(mono) > 0 else 0
|
||||
self.level = self.level * 0.7 + peak * 0.3 # smooth
|
||||
|
||||
# Stereo pan (constant power)
|
||||
import math
|
||||
angle = (self.pan + 1) * math.pi / 4 # 0 to pi/2
|
||||
l_gain = math.cos(angle)
|
||||
r_gain = math.sin(angle)
|
||||
stereo = numpy.zeros((n_frames, 2), dtype=numpy.float32)
|
||||
stereo[:, 0] = mono * l_gain
|
||||
stereo[:, 1] = mono * r_gain
|
||||
|
||||
return stereo
|
||||
|
||||
|
||||
# ── LiveEngine ───────────────────────────────────────────────────────────
|
||||
|
||||
class LiveEngine:
|
||||
"""Real-time MIDI-to-audio engine.
|
||||
|
||||
Maps MIDI channels to pytheory instruments and synthesizes
|
||||
audio in real-time via sounddevice.
|
||||
|
||||
Example::
|
||||
|
||||
engine = LiveEngine()
|
||||
engine.channel(1, instrument="electric_piano")
|
||||
engine.channel(2, instrument="bass_guitar")
|
||||
engine.channel(10, drums=True)
|
||||
engine.start()
|
||||
"""
|
||||
|
||||
def __init__(self, buffer_size=512, sample_rate=SAMPLE_RATE):
|
||||
self.buffer_size = buffer_size
|
||||
self.sample_rate = sample_rate
|
||||
self.channels = {} # MIDI channel (1-16) -> _Channel
|
||||
self._cc_map = {} # (channel, cc_number) -> (param_name, min, max)
|
||||
self._midi_in = None
|
||||
self._stream = None
|
||||
self._stop_event = threading.Event()
|
||||
# Recording
|
||||
self._recording = False
|
||||
self._record_events = [] # (timestamp, ch, note, velocity, on/off)
|
||||
self._record_start = 0
|
||||
# Keyboard MIDI
|
||||
self._keyboard_channel = None
|
||||
self._keyboard_octave = 4
|
||||
# Clock sync
|
||||
self._clock_count = 0 # MIDI clock pulses (24 per quarter note)
|
||||
self._clock_times = [] # timestamps for BPM calculation
|
||||
self._bpm = 120.0
|
||||
self._playing = False
|
||||
# Drum pattern
|
||||
self._drum_pattern = None
|
||||
self._drum_channel = None
|
||||
|
||||
def channel(self, ch, *, instrument=None, synth=None, envelope=None,
|
||||
drums=False, **kwargs):
|
||||
"""Configure a MIDI channel.
|
||||
|
||||
Args:
|
||||
ch: MIDI channel number (1-16). Channel 10 = drums by convention.
|
||||
instrument: Instrument preset name (e.g. "electric_piano").
|
||||
synth: Synth waveform name (overrides instrument).
|
||||
envelope: Envelope name (overrides instrument).
|
||||
drums: If True, this channel triggers drum sounds by MIDI note.
|
||||
**kwargs: Any Part parameter (lowpass, reverb, volume, etc.)
|
||||
"""
|
||||
if not isinstance(ch, int) or not (1 <= ch <= 16):
|
||||
raise ValueError(f"MIDI channel must be an integer 1-16, got {ch!r}")
|
||||
|
||||
# Build params from instrument preset
|
||||
params = {}
|
||||
if instrument:
|
||||
preset = INSTRUMENTS.get(instrument)
|
||||
if preset:
|
||||
params.update(preset)
|
||||
if synth:
|
||||
params["synth"] = synth
|
||||
if envelope:
|
||||
params["envelope"] = envelope
|
||||
params.update(kwargs)
|
||||
|
||||
synth_name = params.pop("synth", "sine")
|
||||
env_name = params.pop("envelope", "piano")
|
||||
|
||||
self.channels[ch] = _Channel(
|
||||
synth_name=synth_name,
|
||||
envelope_name=env_name,
|
||||
is_drums=drums or ch == 10,
|
||||
**params,
|
||||
)
|
||||
return self
|
||||
|
||||
def drums(self, pattern_name, *, volume=0.5):
|
||||
"""Add a drum pattern that syncs to MIDI clock.
|
||||
|
||||
The pattern plays in sync with the OP-XY's transport -
|
||||
starts on Start, stops on Stop, tempo from MIDI clock.
|
||||
|
||||
Args:
|
||||
pattern_name: Drum pattern preset name (e.g. "rock", "house").
|
||||
volume: Drum volume (0.0-1.0).
|
||||
"""
|
||||
from .rhythm import Pattern
|
||||
self._drum_pattern = Pattern.preset(pattern_name)
|
||||
self._drum_channel = _Channel(synth_name="sine", is_drums=True,
|
||||
volume=volume)
|
||||
return self
|
||||
|
||||
def cc(self, cc_number, param, *, min_val=0.0, max_val=1.0, ch=None):
|
||||
"""Map a MIDI CC to a channel parameter.
|
||||
|
||||
Args:
|
||||
cc_number: MIDI CC number (0-127).
|
||||
param: Parameter name ("volume", "lowpass", "reverb", etc.)
|
||||
min_val: Value when CC = 0.
|
||||
max_val: Value when CC = 127.
|
||||
ch: MIDI channel (None = all channels).
|
||||
|
||||
Example::
|
||||
|
||||
>>> engine.cc(11, "lowpass", min_val=200, max_val=8000)
|
||||
>>> engine.cc(12, "volume", min_val=0.0, max_val=1.0)
|
||||
>>> engine.cc(13, "reverb", min_val=0.0, max_val=0.8)
|
||||
>>> engine.cc(14, "distortion", min_val=0.0, max_val=0.8)
|
||||
"""
|
||||
self._cc_map[(ch, cc_number)] = (param, min_val, max_val)
|
||||
return self
|
||||
|
||||
def _apply_cc(self, ch, cc_number, value):
|
||||
"""Apply a CC value to the matching channel parameter."""
|
||||
for key in [(ch, cc_number), (None, cc_number)]:
|
||||
if key in self._cc_map:
|
||||
param, min_val, max_val = self._cc_map[key]
|
||||
scaled = min_val + (max_val - min_val) * (value / 127.0)
|
||||
|
||||
target_chs = [ch] if key[0] is not None else list(self.channels.keys())
|
||||
for target_ch in target_chs:
|
||||
if target_ch in self.channels:
|
||||
channel = self.channels[target_ch]
|
||||
if param == "volume":
|
||||
channel.volume = scaled
|
||||
elif param == "lowpass":
|
||||
channel.lowpass = scaled
|
||||
channel._cache.clear()
|
||||
elif param == "reverb":
|
||||
channel.reverb = scaled
|
||||
channel._cache.clear()
|
||||
elif hasattr(channel, param):
|
||||
setattr(channel, param, scaled)
|
||||
channel._cache.clear()
|
||||
print(f" CC {cc_number}: {param}={scaled:.2f}")
|
||||
return
|
||||
|
||||
def _on_clock(self):
|
||||
"""Handle MIDI clock pulse (24 per quarter note)."""
|
||||
import time as _time
|
||||
|
||||
if not self._playing:
|
||||
return
|
||||
|
||||
now = _time.perf_counter()
|
||||
self._clock_times.append(now)
|
||||
if len(self._clock_times) > 240:
|
||||
self._clock_times = self._clock_times[-240:]
|
||||
# Only update BPM every 24 ticks to avoid jitter
|
||||
if self._clock_count % 24 == 0 and len(self._clock_times) >= 48:
|
||||
# Use as many ticks as we have for best accuracy
|
||||
n = min(len(self._clock_times), 240)
|
||||
total_time = self._clock_times[-1] - self._clock_times[-n]
|
||||
if total_time > 0:
|
||||
ticks = n - 1
|
||||
self._bpm = round(60.0 * ticks / (24.0 * total_time))
|
||||
|
||||
# Trigger drum hits at the right time
|
||||
if self._drum_pattern and self._drum_channel:
|
||||
pattern = self._drum_pattern
|
||||
beat_pos = self._clock_count / 24.0
|
||||
pattern_beat = beat_pos % pattern.beats
|
||||
beat_resolution = 1.0 / 24.0
|
||||
for hit in pattern.hits:
|
||||
if abs(hit.position - pattern_beat) < beat_resolution / 2:
|
||||
self._drum_channel.note_on(hit.sound.value, hit.velocity)
|
||||
|
||||
self._clock_count += 1
|
||||
|
||||
def _all_notes_off(self):
|
||||
"""Kill all sounding voices on all channels."""
|
||||
for channel in self.channels.values():
|
||||
with channel._lock:
|
||||
channel.voices.clear()
|
||||
if self._drum_channel:
|
||||
with self._drum_channel._lock:
|
||||
self._drum_channel.voices.clear()
|
||||
|
||||
def _midi_callback(self, event, data=None):
|
||||
"""Handle incoming MIDI messages."""
|
||||
msg, _ = event
|
||||
if len(msg) == 0:
|
||||
return
|
||||
|
||||
# System realtime messages (1 byte)
|
||||
if msg[0] == 0xF8: # Clock - 24 ppqn
|
||||
self._on_clock()
|
||||
return
|
||||
elif msg[0] == 0xFA: # Start
|
||||
print(" > Start")
|
||||
self._playing = True
|
||||
self._clock_count = 0
|
||||
return
|
||||
elif msg[0] == 0xFC: # Stop
|
||||
print(" [] Stop")
|
||||
self._playing = False
|
||||
self._all_notes_off()
|
||||
return
|
||||
elif msg[0] == 0xFB: # Continue
|
||||
print(" > Continue")
|
||||
self._playing = True
|
||||
return
|
||||
|
||||
if len(msg) < 3:
|
||||
return
|
||||
|
||||
status = msg[0]
|
||||
ch = (status & 0x0F) + 1
|
||||
msg_type = status & 0xF0
|
||||
|
||||
if ch not in self.channels:
|
||||
return
|
||||
|
||||
channel = self.channels[ch]
|
||||
note = msg[1]
|
||||
velocity = msg[2]
|
||||
|
||||
if msg_type == 0x90 and velocity > 0:
|
||||
channel.note_on(note, velocity)
|
||||
if self._recording:
|
||||
import time as _t
|
||||
self._record_events.append(
|
||||
(_t.perf_counter() - self._record_start, ch, note, velocity, True))
|
||||
elif msg_type == 0x80 or (msg_type == 0x90 and velocity == 0):
|
||||
channel.note_off(note)
|
||||
if self._recording:
|
||||
import time as _t
|
||||
self._record_events.append(
|
||||
(_t.perf_counter() - self._record_start, ch, note, 0, False))
|
||||
elif msg_type == 0xB0:
|
||||
self._apply_cc(ch, note, velocity)
|
||||
elif msg_type == 0xE0:
|
||||
bend_raw = (msg[2] << 7) | msg[1]
|
||||
bend_semitones = (bend_raw - 8192) / 8192.0 * 2.0
|
||||
ratio = 2.0 ** (bend_semitones / 12.0)
|
||||
with channel._lock:
|
||||
for v in channel.voices:
|
||||
if v.active:
|
||||
v.pitch_ratio = ratio
|
||||
|
||||
def _audio_callback(self, outdata, frames, time_info, status):
|
||||
"""sounddevice callback - mix all channels to stereo."""
|
||||
stereo = numpy.zeros((frames, 2), dtype=numpy.float32)
|
||||
for channel in self.channels.values():
|
||||
stereo += channel.render_stereo(frames)
|
||||
if self._drum_channel:
|
||||
stereo += self._drum_channel.render_stereo(frames)
|
||||
|
||||
# Soft clip per channel
|
||||
stereo[:, 0] = numpy.tanh(stereo[:, 0])
|
||||
stereo[:, 1] = numpy.tanh(stereo[:, 1])
|
||||
outdata[:] = stereo
|
||||
|
||||
def list_ports(self):
|
||||
"""List available MIDI input ports."""
|
||||
if rtmidi is None:
|
||||
raise ImportError("python-rtmidi required. Install with: pip install pytheory[live]")
|
||||
midi_in = rtmidi.MidiIn()
|
||||
ports = midi_in.get_ports()
|
||||
for i, name in enumerate(ports):
|
||||
print(f" {i}: {name}")
|
||||
midi_in.delete()
|
||||
return ports
|
||||
|
||||
def start(self, port=None):
|
||||
"""Start the engine - opens MIDI input and audio output.
|
||||
|
||||
Args:
|
||||
port: MIDI port index or name. None = first available.
|
||||
|
||||
Blocks until Ctrl-C or stop() is called.
|
||||
"""
|
||||
if rtmidi is None:
|
||||
raise ImportError(
|
||||
"python-rtmidi is required for live MIDI. "
|
||||
"Install it with: pip install pytheory[live]"
|
||||
)
|
||||
|
||||
if not self.channels:
|
||||
self.channel(1, instrument="electric_piano")
|
||||
|
||||
# Pre-compute wavetables
|
||||
print(" Pre-rendering wavetables...")
|
||||
n_samples = SAMPLE_RATE * 3
|
||||
for _, channel in self.channels.items():
|
||||
if channel.is_drums:
|
||||
continue
|
||||
for midi_note in range(36, 97):
|
||||
channel._get_wave(midi_note, n_samples)
|
||||
print(f" Cached {sum(len(c._cache) for c in self.channels.values())} wavetables.")
|
||||
print()
|
||||
|
||||
# Open MIDI
|
||||
self._midi_in = rtmidi.MidiIn()
|
||||
ports = self._midi_in.get_ports()
|
||||
|
||||
if not ports:
|
||||
print(" No MIDI input ports found. (Keyboard mode still works)")
|
||||
self._midi_in.delete()
|
||||
self._midi_in = None
|
||||
# Don't return — still start audio for keyboard mode
|
||||
|
||||
port_name = "none"
|
||||
if self._midi_in and ports:
|
||||
if port is None:
|
||||
port = 0
|
||||
elif isinstance(port, str):
|
||||
matched = False
|
||||
for i, name in enumerate(ports):
|
||||
if port.lower() in name.lower():
|
||||
port = i
|
||||
matched = True
|
||||
break
|
||||
if not matched:
|
||||
self._midi_in.delete()
|
||||
self._midi_in = None
|
||||
print(f" MIDI port not found: {port!r}, continuing without MIDI")
|
||||
port = None
|
||||
|
||||
if self._midi_in and port is not None:
|
||||
try:
|
||||
self._midi_in.open_port(port)
|
||||
self._midi_in.ignore_types(sysex=True, timing=False, active_sense=True)
|
||||
self._midi_in.set_callback(self._midi_callback)
|
||||
port_name = ports[port]
|
||||
except Exception:
|
||||
self._midi_in.delete()
|
||||
self._midi_in = None
|
||||
print(" Failed to open MIDI port, continuing without MIDI")
|
||||
|
||||
print(f" PyTheory Live Engine")
|
||||
print(f" MIDI: {port_name}")
|
||||
print(f" Buffer: {self.buffer_size} samples ({self.buffer_size/self.sample_rate*1000:.1f}ms)")
|
||||
print(f" Channels:")
|
||||
for ch, channel in sorted(self.channels.items()):
|
||||
kind = "drums" if channel.is_drums else channel.synth_name
|
||||
print(f" {ch:2d}: {kind} (vol={channel.volume})")
|
||||
if self._drum_pattern:
|
||||
print(f" Drums: {self._drum_pattern.name} (synced to MIDI clock)")
|
||||
print()
|
||||
print(" Playing... (Ctrl-C to stop)")
|
||||
print()
|
||||
|
||||
self._stream = sd.OutputStream(
|
||||
samplerate=self.sample_rate,
|
||||
blocksize=self.buffer_size,
|
||||
channels=2,
|
||||
dtype='float32',
|
||||
callback=self._audio_callback,
|
||||
)
|
||||
|
||||
try:
|
||||
self._stream.start()
|
||||
self._stop_event.clear()
|
||||
self._stop_event.wait()
|
||||
except KeyboardInterrupt:
|
||||
print("\n Stopped.")
|
||||
finally:
|
||||
if self._stream:
|
||||
self._stream.stop()
|
||||
self._stream.close()
|
||||
self._stream = None
|
||||
if self._midi_in:
|
||||
self._midi_in.close_port()
|
||||
self._midi_in.delete()
|
||||
self._midi_in = None
|
||||
|
||||
def keyboard_play(self, ch=1):
|
||||
"""Enable computer keyboard as MIDI input on a channel."""
|
||||
self._keyboard_channel = ch
|
||||
return self
|
||||
|
||||
def keyboard_note(self, key, on=True):
|
||||
"""Translate a keyboard key to a MIDI note and play it.
|
||||
|
||||
QWERTY layout: ZSXDCVGBHNJM = C through B (lower octave)
|
||||
Q2W3ER5T6Y7U = C through B (upper octave)
|
||||
"""
|
||||
# Chromatic layout across the full keyboard
|
||||
# Bottom two rows = lower octave range
|
||||
# Top two rows = upper octave range (+1 octave)
|
||||
# Black keys on the row above their white keys
|
||||
#
|
||||
# Row 3 (ZXCVBNM,./): white keys C D E F G A B C D E
|
||||
# Row 2 (ASDFGHJKL;'): black keys + extras
|
||||
# Row 1 (QWERTYUIOP[]): white keys C D E F G A B C D E F G
|
||||
# Row 0 (1234567890-=): black keys + extras
|
||||
lower = {
|
||||
# White keys: Z X C V B N M , . /
|
||||
'z': 0, 'x': 2, 'c': 4, 'v': 5, 'b': 7, 'n': 9, 'm': 11,
|
||||
',': 12, '.': 14, '/': 16,
|
||||
# Black keys: S D G H J L ;
|
||||
's': 1, 'd': 3, 'g': 6, 'h': 8, 'j': 10,
|
||||
'l': 13, ';': 15,
|
||||
# Extras
|
||||
'a': 0, 'f': 4, 'k': 11, "'": 17,
|
||||
}
|
||||
upper = {
|
||||
# White keys: Q W E R T Y U I O P [ ]
|
||||
'q': 0, 'w': 2, 'e': 4, 'r': 5, 't': 7, 'y': 9, 'u': 11,
|
||||
'i': 12, 'o': 14, 'p': 16, '[': 17, ']': 19,
|
||||
# Black keys: 2 3 5 6 7 9 0
|
||||
'2': 1, '3': 3, '5': 6, '6': 8, '7': 10,
|
||||
'9': 13, '0': 15,
|
||||
# Extras
|
||||
'1': 0, '4': 4, '8': 11, '-': 18, '=': 19,
|
||||
}
|
||||
|
||||
if self._keyboard_channel is None:
|
||||
return False
|
||||
|
||||
ch = self._keyboard_channel
|
||||
if ch not in self.channels:
|
||||
return False
|
||||
|
||||
midi_note = None
|
||||
if key in lower:
|
||||
midi_note = (self._keyboard_octave + 1) * 12 + lower[key]
|
||||
elif key in upper:
|
||||
midi_note = (self._keyboard_octave + 2) * 12 + upper[key]
|
||||
|
||||
if midi_note is not None:
|
||||
channel = self.channels[ch]
|
||||
if on:
|
||||
channel.note_on(midi_note, 100)
|
||||
if self._recording:
|
||||
import time as _t
|
||||
self._record_events.append(
|
||||
(_t.perf_counter() - self._record_start,
|
||||
ch, midi_note, 100, True))
|
||||
else:
|
||||
channel.note_off(midi_note)
|
||||
if self._recording:
|
||||
import time as _t
|
||||
self._record_events.append(
|
||||
(_t.perf_counter() - self._record_start,
|
||||
ch, midi_note, 0, False))
|
||||
return True
|
||||
return False
|
||||
|
||||
def start_recording(self):
|
||||
"""Start recording MIDI events."""
|
||||
import time as _t
|
||||
self._record_events = []
|
||||
self._record_start = _t.perf_counter()
|
||||
self._recording = True
|
||||
|
||||
def stop_recording(self):
|
||||
"""Stop recording."""
|
||||
self._recording = False
|
||||
|
||||
def export_recording(self, filename="recording.mid", bpm=None):
|
||||
"""Export recorded events to a MIDI file.
|
||||
|
||||
Returns a pytheory Score if no filename given.
|
||||
"""
|
||||
if not self._record_events:
|
||||
return None
|
||||
|
||||
use_bpm = bpm or (self._bpm if self._bpm > 10 else 120)
|
||||
|
||||
from .rhythm import Score, Duration
|
||||
|
||||
score = Score("4/4", bpm=int(use_bpm))
|
||||
|
||||
# Group events by channel
|
||||
by_channel = {}
|
||||
for ts, ch, note, vel, is_on in self._record_events:
|
||||
if ch not in by_channel:
|
||||
by_channel[ch] = []
|
||||
by_channel[ch].append((ts, note, vel, is_on))
|
||||
|
||||
# Build parts
|
||||
for ch, events in sorted(by_channel.items()):
|
||||
inst = self.picks[ch - 1] if 1 <= ch <= 8 else "piano"
|
||||
part = score.part(f"ch{ch}", instrument=inst)
|
||||
|
||||
# Convert to note-on/off pairs
|
||||
active = {}
|
||||
notes = []
|
||||
for ts, note, vel, is_on in events:
|
||||
if is_on:
|
||||
active[note] = (ts, vel)
|
||||
elif note in active:
|
||||
start_ts, start_vel = active.pop(note)
|
||||
dur_sec = ts - start_ts
|
||||
dur_beats = dur_sec * use_bpm / 60.0
|
||||
notes.append((start_ts, note, max(0.125, dur_beats), start_vel))
|
||||
|
||||
notes.sort(key=lambda x: x[0])
|
||||
|
||||
beat_pos = 0.0
|
||||
for ts, midi_note, dur, vel in notes:
|
||||
note_beat = ts * use_bpm / 60.0
|
||||
if note_beat > beat_pos:
|
||||
part.rest(note_beat - beat_pos)
|
||||
# Convert MIDI note to name
|
||||
name = NOTE_NAMES[midi_note % 12]
|
||||
octave = midi_note // 12 - 1
|
||||
part.add(f"{name}{octave}", dur, velocity=vel)
|
||||
beat_pos = note_beat + dur
|
||||
|
||||
if filename:
|
||||
score.save_midi(filename)
|
||||
|
||||
return score
|
||||
|
||||
def save_config(self, filename):
|
||||
"""Save current configuration to JSON."""
|
||||
import json
|
||||
config = {
|
||||
"seed": self.seed if hasattr(self, 'seed') else None,
|
||||
"buffer_size": self.buffer_size,
|
||||
"drums": getattr(self, '_drum_pattern_name', None),
|
||||
"channels": {},
|
||||
}
|
||||
for ch, channel in self.channels.items():
|
||||
config["channels"][str(ch)] = {
|
||||
"synth": channel.synth_name,
|
||||
"envelope": channel.envelope_name,
|
||||
"volume": channel.volume,
|
||||
"pan": channel.pan,
|
||||
"reverb": channel.reverb,
|
||||
"lowpass": channel.lowpass,
|
||||
"chorus": channel.chorus,
|
||||
"distortion": channel.distortion,
|
||||
"is_drums": channel.is_drums,
|
||||
}
|
||||
with open(filename, 'w') as f:
|
||||
json.dump(config, f, indent=2)
|
||||
|
||||
def load_config(self, filename):
|
||||
"""Load configuration from JSON."""
|
||||
import json
|
||||
with open(filename) as f:
|
||||
config = json.load(f)
|
||||
for ch_str, ch_cfg in config.get("channels", {}).items():
|
||||
ch = int(ch_str)
|
||||
self.channel(ch,
|
||||
synth=ch_cfg.get("synth"),
|
||||
envelope=ch_cfg.get("envelope"),
|
||||
drums=ch_cfg.get("is_drums", False),
|
||||
volume=ch_cfg.get("volume", 0.5),
|
||||
pan=ch_cfg.get("pan", 0.0),
|
||||
reverb=ch_cfg.get("reverb", 0),
|
||||
lowpass=ch_cfg.get("lowpass", 0),
|
||||
chorus=ch_cfg.get("chorus", 0),
|
||||
distortion=ch_cfg.get("distortion", 0))
|
||||
if config.get("drums"):
|
||||
self.drums(config["drums"])
|
||||
|
||||
def stop(self):
|
||||
"""Stop the engine."""
|
||||
self._stop_event.set()
|
||||
if self._stream:
|
||||
self._stream.stop()
|
||||
if self._midi_in:
|
||||
self._midi_in.close_port()
|
||||
self._midi_in.delete()
|
||||
self._midi_in = None
|
||||
@@ -0,0 +1,819 @@
|
||||
"""PyTheory Live — interactive MIDI synthesizer with TUI."""
|
||||
|
||||
import curses
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
|
||||
from pytheory.live import LiveEngine
|
||||
from pytheory.rhythm import INSTRUMENTS, Pattern
|
||||
|
||||
|
||||
NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
|
||||
|
||||
|
||||
def note_name(midi):
|
||||
return f"{NOTE_NAMES[midi % 12]}{midi // 12 - 1}"
|
||||
|
||||
|
||||
class LiveTUI:
|
||||
def __init__(self, seed=None, port="OP-XY", n_channels=8,
|
||||
drum_pattern="rock", buffer_size=128):
|
||||
self.seed = seed or random.randint(0, 9999)
|
||||
self.port = port
|
||||
self.n_channels = n_channels
|
||||
self.buffer_size = buffer_size
|
||||
self.engine = None
|
||||
self.log_lines = []
|
||||
self.max_log = 500
|
||||
self.running = True
|
||||
self.instruments = sorted([k for k in INSTRUMENTS.keys()
|
||||
if k not in ("808_bass",)])
|
||||
self.drum_patterns = sorted(Pattern.list_presets())
|
||||
self.current_drum = drum_pattern
|
||||
self.picks = []
|
||||
self.bpm = "—"
|
||||
self.status = "Init"
|
||||
self.status_color = 3
|
||||
self._build_engine()
|
||||
|
||||
def _build_engine(self):
|
||||
rng = random.Random(self.seed)
|
||||
self.picks = rng.sample(self.instruments, min(self.n_channels, len(self.instruments)))
|
||||
self.engine = LiveEngine(buffer_size=self.buffer_size)
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
self.engine.channel(i, instrument=inst, reverb=0.3)
|
||||
if self.current_drum and self.current_drum not in ("none", "-"):
|
||||
self.engine.drums(self.current_drum, volume=0.5)
|
||||
self.engine.cc(0, "lowpass", min_val=300, max_val=8000)
|
||||
|
||||
def log(self, msg, color=0):
|
||||
self.log_lines.append((time.time(), msg, color))
|
||||
if len(self.log_lines) > self.max_log:
|
||||
self.log_lines = self.log_lines[-self.max_log:]
|
||||
|
||||
def _patch_engine_logging(self):
|
||||
original_cb = self.engine._midi_callback
|
||||
|
||||
def logging_cb(event, data=None):
|
||||
msg, _ = event
|
||||
if len(msg) == 0:
|
||||
return
|
||||
if msg[0] == 0xF8:
|
||||
if self.engine._bpm > 10:
|
||||
self.bpm = f"{self.engine._bpm:.0f}"
|
||||
elif msg[0] == 0xFA:
|
||||
self.log("▶ Start", 5)
|
||||
self.status = "Playing"
|
||||
self.status_color = 1
|
||||
elif msg[0] == 0xFC:
|
||||
self.log("■ Stop", 4)
|
||||
self.status = "Stopped"
|
||||
self.status_color = 4
|
||||
elif msg[0] == 0xFB:
|
||||
self.log("▶ Continue", 5)
|
||||
self.status = "Playing"
|
||||
self.status_color = 1
|
||||
elif len(msg) >= 3:
|
||||
status = msg[0]
|
||||
ch = (status & 0x0F) + 1
|
||||
msg_type = status & 0xF0
|
||||
if msg_type == 0x90 and msg[2] > 0:
|
||||
inst = self.picks[ch - 1] if 1 <= ch <= 8 else "?"
|
||||
self.log(f"♪ {ch}:{inst} {note_name(msg[1])} v={msg[2]}", 1)
|
||||
elif msg_type == 0xB0:
|
||||
self.log(f"⚙ CC{msg[1]}={msg[2]}", 3)
|
||||
elif msg_type == 0xE0:
|
||||
bend = ((msg[2] << 7) | msg[1]) - 8192
|
||||
self.log(f"↕ Bend ch{ch} {bend:+d}", 3)
|
||||
original_cb(event, data)
|
||||
|
||||
self.engine._midi_callback = logging_cb
|
||||
|
||||
def run(self, stdscr):
|
||||
curses.curs_set(0)
|
||||
curses.use_default_colors()
|
||||
curses.init_pair(1, curses.COLOR_GREEN, -1)
|
||||
curses.init_pair(2, curses.COLOR_CYAN, -1)
|
||||
curses.init_pair(3, curses.COLOR_YELLOW, -1)
|
||||
curses.init_pair(4, curses.COLOR_RED, -1)
|
||||
curses.init_pair(5, curses.COLOR_MAGENTA, -1)
|
||||
curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_BLUE)
|
||||
curses.init_pair(7, curses.COLOR_BLACK, curses.COLOR_GREEN)
|
||||
curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_YELLOW)
|
||||
curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_RED)
|
||||
stdscr.nodelay(True)
|
||||
stdscr.timeout(60)
|
||||
|
||||
self._patch_engine_logging()
|
||||
|
||||
# Pre-render with progress
|
||||
self.status = "Rendering"
|
||||
self.status_color = 3
|
||||
stdscr.erase()
|
||||
stdscr.addstr(1, 2, "PyTheory Live", curses.A_BOLD)
|
||||
stdscr.addstr(2, 2, "Pre-rendering wavetables...", curses.color_pair(3))
|
||||
stdscr.refresh()
|
||||
|
||||
n_samples = 44100 * 3
|
||||
count = 0
|
||||
for _, channel in self.engine.channels.items():
|
||||
if channel.is_drums:
|
||||
continue
|
||||
for midi_note in range(36, 97):
|
||||
channel._get_wave(midi_note, n_samples)
|
||||
count += 1
|
||||
if count % 50 == 0:
|
||||
stdscr.addstr(3, 2, f" {count} wavetables...", curses.color_pair(2))
|
||||
stdscr.refresh()
|
||||
|
||||
self.log(f"Cached {count} wavetables", 1)
|
||||
self.log("Starting engine...", 3)
|
||||
self.status = "Starting"
|
||||
self.status_color = 3
|
||||
|
||||
# Start engine
|
||||
def run_engine():
|
||||
import io
|
||||
capture = io.StringIO()
|
||||
old = sys.stdout
|
||||
sys.stdout = capture
|
||||
try:
|
||||
self.engine.start(port=self.port)
|
||||
except Exception as e:
|
||||
self.log(f"Engine error: {e}", 4)
|
||||
finally:
|
||||
sys.stdout = old
|
||||
output = capture.getvalue()
|
||||
if output.strip():
|
||||
for line in output.strip().split('\n'):
|
||||
self.log(line.strip(), 2)
|
||||
|
||||
engine_thread = threading.Thread(target=run_engine, daemon=True)
|
||||
engine_thread.start()
|
||||
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
cmd_history = []
|
||||
history_idx = -1
|
||||
tab_matches = []
|
||||
tab_idx = -1
|
||||
tab_prefix = ""
|
||||
self.kbd_active = False
|
||||
self._kbd_held = {} # key → last_press_time
|
||||
self._picker = None # {"channel": int, "index": int, "scroll": int, "filter": str}
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
# Update status from engine state
|
||||
if (self.engine._stream and self.engine._stream.active
|
||||
and self.status == "Starting"):
|
||||
self.status = "Listening"
|
||||
self.status_color = 2
|
||||
self.log("Audio stream active", 1)
|
||||
|
||||
h, w = stdscr.getmaxyx()
|
||||
if h < 10 or w < 40:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
stdscr.erase()
|
||||
|
||||
div = max(30, w * 3 // 5)
|
||||
cfg_x = div + 2
|
||||
|
||||
# ═══ HEADER BAR ═══
|
||||
header = f" PyTheory Live "
|
||||
stdscr.addstr(0, 0, header, curses.color_pair(6) | curses.A_BOLD)
|
||||
|
||||
# Status badge
|
||||
badge_colors = {1: 7, 2: 6, 3: 8, 4: 9}
|
||||
badge_cp = badge_colors.get(self.status_color, 6)
|
||||
badge = f" {self.status} "
|
||||
x = len(header)
|
||||
stdscr.addstr(0, x, badge, curses.color_pair(badge_cp) | curses.A_BOLD)
|
||||
x += len(badge)
|
||||
|
||||
kbd_mode = ""
|
||||
if self.engine._keyboard_channel:
|
||||
kbd_mode = f" KBD:ch{self.engine._keyboard_channel}"
|
||||
rec_mode = " ●REC" if self.engine._recording else ""
|
||||
info = f" BPM:{self.bpm} drums:{self.current_drum} seed:{self.seed}{kbd_mode}{rec_mode}"
|
||||
try:
|
||||
stdscr.addstr(0, x, info[:w - x], curses.color_pair(6))
|
||||
# Fill rest of header
|
||||
remaining = w - x - len(info)
|
||||
if remaining > 0:
|
||||
stdscr.addstr(0, x + len(info), " " * remaining, curses.color_pair(6))
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ DIVIDER ═══
|
||||
for y in range(1, h - 2):
|
||||
try:
|
||||
stdscr.addch(y, div, '│', curses.color_pair(2) | curses.A_DIM)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ LEFT: EVENTS ═══
|
||||
try:
|
||||
stdscr.addstr(1, 1, " Events ", curses.color_pair(2) | curses.A_BOLD)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
log_h = h - 5
|
||||
visible = self.log_lines[-log_h:] if log_h > 0 else []
|
||||
for i, (ts, msg, color) in enumerate(visible):
|
||||
ly = 2 + i
|
||||
if ly >= h - 3:
|
||||
break
|
||||
# Fade old messages
|
||||
age = time.time() - ts
|
||||
attr = curses.A_DIM if age > 8 else 0
|
||||
try:
|
||||
stdscr.addstr(ly, 1, msg[:div - 2],
|
||||
curses.color_pair(color) | attr)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ RIGHT: CONFIG ═══
|
||||
try:
|
||||
stdscr.addstr(1, cfg_x, " Config ",
|
||||
curses.color_pair(2) | curses.A_BOLD)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
y = 3
|
||||
rw = w - cfg_x - 1
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD)
|
||||
stdscr.addstr(y, cfg_x + 1, ":", curses.A_DIM)
|
||||
stdscr.addstr(y, cfg_x + 2, inst[:rw - 2],
|
||||
curses.color_pair(1))
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
|
||||
y += 1
|
||||
# VU meters per channel
|
||||
y += 1
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, "Levels:", curses.A_BOLD | curses.A_DIM)
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
if i in self.engine.channels:
|
||||
lv = self.engine.channels[i].level
|
||||
bars = int(min(lv, 1.0) * 16)
|
||||
meter = "|" * bars + "-" * (16 - bars)
|
||||
color = 1 if bars < 15 else 3 if bars < 18 else 4
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD)
|
||||
stdscr.addstr(y, cfg_x + 1, f" {meter}", curses.color_pair(color))
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
|
||||
y += 1
|
||||
rec_indicator = " ● REC " if self.engine._recording else ""
|
||||
kbd_indicator = f" kbd:ch{self.engine._keyboard_channel} oct{self.engine._keyboard_octave}" if self.engine._keyboard_channel else ""
|
||||
|
||||
pairs = [
|
||||
("Drums", self.current_drum, 5),
|
||||
("BPM", str(self.bpm), 0),
|
||||
("Latency", f"{self.engine.buffer_size / 44100 * 1000:.1f}ms", 0),
|
||||
("MIDI", self.port, 0),
|
||||
("Seed", str(self.seed), 2),
|
||||
]
|
||||
if rec_indicator:
|
||||
pairs.append(("", rec_indicator, 4))
|
||||
if kbd_indicator:
|
||||
pairs.append(("", kbd_indicator, 2))
|
||||
for label, val, cp in pairs:
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, f"{label}:", curses.A_BOLD)
|
||||
stdscr.addstr(y, cfg_x + len(label) + 1, f" {val}"[:rw],
|
||||
curses.color_pair(cp))
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
|
||||
y += 1
|
||||
cmds = [
|
||||
"ch <n> [inst]",
|
||||
"fx <n> <param> <val>",
|
||||
"pan <n> <-1..1>",
|
||||
"drums [pattern|-]",
|
||||
"kbd [ch] [oct]",
|
||||
"rec / stop / export",
|
||||
"save/load <file>",
|
||||
"seed [n] / list",
|
||||
"exit",
|
||||
]
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, "Commands:", curses.color_pair(3))
|
||||
except curses.error:
|
||||
pass
|
||||
for i, c in enumerate(cmds):
|
||||
try:
|
||||
stdscr.addstr(y + 1 + i, cfg_x + 1, c[:rw],
|
||||
curses.color_pair(2) | curses.A_DIM)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ INPUT BAR ═══
|
||||
iy = h - 2
|
||||
try:
|
||||
stdscr.addstr(iy - 1, 0, "─" * (w - 1), curses.A_DIM)
|
||||
if self.kbd_active:
|
||||
stdscr.addstr(iy, 0, " KEYBOARD ",
|
||||
curses.color_pair(7) | curses.A_BOLD)
|
||||
stdscr.addstr(iy, 10, " Esc=exit Up/Down=octave ",
|
||||
curses.color_pair(3))
|
||||
else:
|
||||
stdscr.addstr(iy, 0, " $ ",
|
||||
curses.color_pair(1) | curses.A_BOLD)
|
||||
stdscr.addstr(iy, 3, cmd_buf[:w - 5])
|
||||
# Cursor at position
|
||||
cx = 3 + cursor_pos
|
||||
if cx < w - 1:
|
||||
# Show character under cursor inverted, or block at end
|
||||
if cursor_pos < len(cmd_buf):
|
||||
stdscr.addstr(iy, cx, cmd_buf[cursor_pos],
|
||||
curses.A_REVERSE)
|
||||
else:
|
||||
stdscr.addstr(iy, cx, " ", curses.A_REVERSE)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ PICKER OVERLAY ═══
|
||||
if self._picker is not None:
|
||||
filt = self._picker["filter"]
|
||||
items = [i for i in self.instruments if filt in i] if filt else self.instruments
|
||||
idx = self._picker["index"]
|
||||
pw = min(32, w - 4)
|
||||
ph = min(len(items) + 2, h - 4)
|
||||
px = (w - pw) // 2
|
||||
py = (h - ph) // 2
|
||||
# Border
|
||||
title = f" Ch {self._picker['channel']} "
|
||||
try:
|
||||
stdscr.addstr(py, px, "┌" + title + "─" * (pw - 2 - len(title)) + "┐", curses.color_pair(2) | curses.A_BOLD)
|
||||
for ri in range(1, ph - 1):
|
||||
stdscr.addstr(py + ri, px, "│" + " " * (pw - 2) + "│", curses.color_pair(2))
|
||||
stdscr.addstr(py + ph - 1, px, "└" + "─" * (pw - 2) + "┘", curses.color_pair(2))
|
||||
except curses.error:
|
||||
pass
|
||||
# Items
|
||||
vis_h = ph - 2
|
||||
scroll = self._picker["scroll"]
|
||||
if idx < scroll:
|
||||
scroll = idx
|
||||
elif idx >= scroll + vis_h:
|
||||
scroll = idx - vis_h + 1
|
||||
self._picker["scroll"] = scroll
|
||||
for ri in range(vis_h):
|
||||
li = scroll + ri
|
||||
if li >= len(items):
|
||||
break
|
||||
name = items[li][:pw - 4]
|
||||
attr = curses.A_REVERSE | curses.color_pair(1) if li == idx else curses.color_pair(0)
|
||||
try:
|
||||
padded = f" {name}" + " " * (pw - 3 - len(name))
|
||||
stdscr.addstr(py + 1 + ri, px + 1, padded, attr)
|
||||
except curses.error:
|
||||
pass
|
||||
# Filter hint
|
||||
hint = f"/{filt}" if filt else "type to filter"
|
||||
try:
|
||||
stdscr.addstr(py + ph - 1, px + 2, hint[:pw - 4], curses.color_pair(3) | curses.A_DIM)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
stdscr.refresh()
|
||||
|
||||
# ═══ INPUT ═══
|
||||
ch = stdscr.getch()
|
||||
if ch == -1:
|
||||
continue
|
||||
|
||||
# KEYBOARD MODE: all keys go to MIDI
|
||||
if self.kbd_active:
|
||||
if ch == 27: # Escape exits keyboard mode
|
||||
# Release all held notes
|
||||
for k in list(self._kbd_held):
|
||||
self.engine.keyboard_note(k, on=False)
|
||||
self._kbd_held.clear()
|
||||
self.kbd_active = False
|
||||
self.engine._keyboard_channel = None
|
||||
self.log("Keyboard off (Esc)", 3)
|
||||
elif ch == curses.KEY_UP:
|
||||
self.engine._keyboard_octave = min(8, self.engine._keyboard_octave + 1)
|
||||
self.log(f"Octave ↑ {self.engine._keyboard_octave}", 2)
|
||||
elif ch == curses.KEY_DOWN:
|
||||
self.engine._keyboard_octave = max(0, self.engine._keyboard_octave - 1)
|
||||
self.log(f"Octave ↓ {self.engine._keyboard_octave}", 2)
|
||||
elif 32 <= ch < 127:
|
||||
key = chr(ch).lower()
|
||||
now = time.time()
|
||||
if key in self._kbd_held:
|
||||
# Key repeat — just refresh the timer
|
||||
self._kbd_held[key] = now
|
||||
else:
|
||||
# New key press
|
||||
played = self.engine.keyboard_note(key, on=True)
|
||||
if played:
|
||||
self._kbd_held[key] = now
|
||||
|
||||
# Release keys that haven't been pressed for 150ms
|
||||
now = time.time()
|
||||
expired = [k for k, t in self._kbd_held.items()
|
||||
if now - t > 0.15]
|
||||
for k in expired:
|
||||
self.engine.keyboard_note(k, on=False)
|
||||
del self._kbd_held[k]
|
||||
|
||||
continue
|
||||
|
||||
# PICKER MODE
|
||||
if self._picker is not None:
|
||||
filt = self._picker["filter"]
|
||||
items = [i for i in self.instruments if filt in i] if filt else self.instruments
|
||||
if ch == 27: # Escape cancels
|
||||
self._picker = None
|
||||
elif ch == curses.KEY_UP:
|
||||
self._picker["index"] = max(0, self._picker["index"] - 1)
|
||||
elif ch == curses.KEY_DOWN:
|
||||
self._picker["index"] = min(len(items) - 1, self._picker["index"] + 1)
|
||||
elif ch == 10 or ch == 13: # Enter selects
|
||||
if items:
|
||||
inst = items[self._picker["index"]]
|
||||
n = self._picker["channel"]
|
||||
self.picks[n - 1] = inst
|
||||
self.engine.channel(n, instrument=inst, reverb=0.3)
|
||||
self.log(f"Ch {n} → {inst}", 1)
|
||||
self._picker = None
|
||||
elif ch == curses.KEY_BACKSPACE or ch == 127:
|
||||
if filt:
|
||||
self._picker["filter"] = filt[:-1]
|
||||
self._picker["index"] = 0
|
||||
self._picker["scroll"] = 0
|
||||
elif 32 <= ch < 127:
|
||||
self._picker["filter"] = filt + chr(ch)
|
||||
self._picker["index"] = 0
|
||||
self._picker["scroll"] = 0
|
||||
continue
|
||||
|
||||
if ch == 10 or ch == 13:
|
||||
if cmd_buf.strip():
|
||||
cmd_history.append(cmd_buf)
|
||||
self._handle_command(cmd_buf.strip())
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
history_idx = -1
|
||||
elif ch == 27:
|
||||
if self.engine._keyboard_channel and not cmd_buf:
|
||||
# Escape exits keyboard mode
|
||||
self.engine._keyboard_channel = None
|
||||
self.log("Keyboard off (Esc)", 3)
|
||||
else:
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
elif ch == curses.KEY_BACKSPACE or ch == 127:
|
||||
if cursor_pos > 0:
|
||||
cmd_buf = cmd_buf[:cursor_pos - 1] + cmd_buf[cursor_pos:]
|
||||
cursor_pos -= 1
|
||||
elif ch == curses.KEY_LEFT:
|
||||
cursor_pos = max(0, cursor_pos - 1)
|
||||
elif ch == curses.KEY_RIGHT:
|
||||
cursor_pos = min(len(cmd_buf), cursor_pos + 1)
|
||||
elif ch == curses.KEY_HOME or ch == 1: # Ctrl-A
|
||||
cursor_pos = 0
|
||||
elif ch == curses.KEY_END or ch == 5: # Ctrl-E
|
||||
cursor_pos = len(cmd_buf)
|
||||
elif ch == curses.KEY_UP:
|
||||
if cmd_history and history_idx < len(cmd_history) - 1:
|
||||
history_idx += 1
|
||||
cmd_buf = cmd_history[-(history_idx + 1)]
|
||||
cursor_pos = len(cmd_buf)
|
||||
elif ch == curses.KEY_DOWN:
|
||||
if history_idx > 0:
|
||||
history_idx -= 1
|
||||
cmd_buf = cmd_history[-(history_idx + 1)]
|
||||
cursor_pos = len(cmd_buf)
|
||||
else:
|
||||
history_idx = -1
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
elif ch == 9: # Tab
|
||||
if tab_matches and tab_prefix == cmd_buf:
|
||||
tab_idx = (tab_idx + 1) % len(tab_matches)
|
||||
cmd_buf = tab_matches[tab_idx]
|
||||
else:
|
||||
tab_matches = self._complete(cmd_buf)
|
||||
tab_prefix = cmd_buf
|
||||
if len(tab_matches) == 1:
|
||||
cmd_buf = tab_matches[0]
|
||||
tab_matches = []
|
||||
elif tab_matches:
|
||||
tab_idx = 0
|
||||
cmd_buf = tab_matches[0]
|
||||
else:
|
||||
tab_matches = []
|
||||
cursor_pos = len(cmd_buf)
|
||||
elif 32 <= ch < 127:
|
||||
cmd_buf = cmd_buf[:cursor_pos] + chr(ch) + cmd_buf[cursor_pos:]
|
||||
cursor_pos += 1
|
||||
tab_matches = []
|
||||
tab_idx = -1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
self.running = False
|
||||
|
||||
self.engine.stop()
|
||||
|
||||
def _complete(self, text):
|
||||
"""Return list of completions for current input."""
|
||||
parts = text.split()
|
||||
commands = ["ch", "fx", "pan", "drums", "kbd", "rec", "stop", "export",
|
||||
"save", "load", "seed", "list", "patterns", "octave", "help", "exit"]
|
||||
fx_params = ["volume", "pan", "lowpass", "lowpass_q", "reverb", "chorus",
|
||||
"detune", "spread", "analog", "distortion", "delay",
|
||||
"tremolo_depth", "saturation", "phaser", "sub_osc", "noise_mix"]
|
||||
|
||||
if not parts:
|
||||
return [c + " " for c in commands]
|
||||
|
||||
# Completing first word
|
||||
if len(parts) == 1 and not text.endswith(" "):
|
||||
prefix = parts[0].lower()
|
||||
return [c + " " for c in commands if c.startswith(prefix)]
|
||||
|
||||
verb = parts[0].lower()
|
||||
|
||||
# ch <n> <instrument>
|
||||
if verb == "ch" and len(parts) == 3 and not text.endswith(" "):
|
||||
prefix = parts[2].lower()
|
||||
return [f"ch {parts[1]} {i} " for i in self.instruments
|
||||
if i.startswith(prefix)]
|
||||
if verb == "ch" and len(parts) == 2 and text.endswith(" "):
|
||||
return [f"ch {parts[1]} {i} " for i in self.instruments]
|
||||
|
||||
# drums <pattern>
|
||||
if verb == "drums" and len(parts) == 2 and not text.endswith(" "):
|
||||
prefix = parts[1].lower()
|
||||
matches = [p for p in self.drum_patterns if p.startswith(prefix)]
|
||||
return [f"drums {m} " for m in matches]
|
||||
if verb == "drums" and len(parts) == 1 and text.endswith(" "):
|
||||
return [f"drums {p} " for p in self.drum_patterns]
|
||||
|
||||
# fx <n> <param> <val>
|
||||
if verb == "fx" and len(parts) == 3 and not text.endswith(" "):
|
||||
prefix = parts[2].lower()
|
||||
return [f"fx {parts[1]} {p} " for p in fx_params
|
||||
if p.startswith(prefix)]
|
||||
if verb == "fx" and len(parts) == 2 and text.endswith(" "):
|
||||
return [f"fx {parts[1]} {p} " for p in fx_params]
|
||||
|
||||
return []
|
||||
|
||||
def _handle_command(self, cmd):
|
||||
parts = cmd.split()
|
||||
if not parts:
|
||||
return
|
||||
verb = parts[0].lower()
|
||||
|
||||
if verb in ("quit", "q", "exit"):
|
||||
self.running = False
|
||||
elif verb in ("help", "h"):
|
||||
self.log("ch <n> [inst] | fx <n> <param> <val> | drums [pat|-]", 2)
|
||||
self.log("seed [n] | list | patterns | exit", 2)
|
||||
elif verb == "ch" and len(parts) == 2:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
if 1 <= n <= len(self.picks):
|
||||
# Open instrument picker with current instrument pre-selected
|
||||
current = self.picks[n - 1]
|
||||
idx = self.instruments.index(current) if current in self.instruments else 0
|
||||
self._picker = {"channel": n, "index": idx, "scroll": max(0, idx - 5), "filter": ""}
|
||||
else:
|
||||
self.log(f"Channel 1-{len(self.picks)}", 4)
|
||||
except ValueError:
|
||||
self.log("ch <1-8> [instrument]", 4)
|
||||
elif verb == "ch" and len(parts) >= 3:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
inst = parts[2]
|
||||
if inst not in INSTRUMENTS:
|
||||
self.log(f"Unknown: {inst}", 4)
|
||||
return
|
||||
if not (1 <= n <= len(self.picks)):
|
||||
self.log(f"Channel 1-{len(self.picks)}", 4)
|
||||
return
|
||||
self.picks[n - 1] = inst
|
||||
self.engine.channel(n, instrument=inst, reverb=0.3)
|
||||
self.log(f"Ch {n} → {inst}", 1)
|
||||
except (ValueError, IndexError):
|
||||
self.log("ch <1-8> <instrument>", 4)
|
||||
elif verb == "fx" and len(parts) >= 4:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
param = parts[2]
|
||||
val = float(parts[3])
|
||||
if not (1 <= n <= len(self.picks)):
|
||||
self.log(f"Channel 1-{len(self.picks)}", 4)
|
||||
return
|
||||
if n not in self.engine.channels:
|
||||
self.log(f"Channel {n} not active", 4)
|
||||
return
|
||||
channel = self.engine.channels[n]
|
||||
if param == "reverb":
|
||||
channel.reverb = val
|
||||
channel._cache.clear()
|
||||
elif param == "lowpass":
|
||||
channel.lowpass = val
|
||||
channel._cache.clear()
|
||||
elif param == "volume":
|
||||
channel.volume = val
|
||||
elif hasattr(channel, param):
|
||||
setattr(channel, param, val)
|
||||
channel._cache.clear()
|
||||
else:
|
||||
self.log(f"Unknown param: {param}", 4)
|
||||
return
|
||||
self.log(f"Ch {n} {param}={val}", 1)
|
||||
except (ValueError, IndexError):
|
||||
self.log("fx <1-8> <param> <value>", 4)
|
||||
elif verb == "fx" and len(parts) == 2:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
if n in self.engine.channels:
|
||||
ch = self.engine.channels[n]
|
||||
self.log(f"Ch {n}: vol={ch.volume} lp={ch.lowpass} rev={ch.reverb}", 2)
|
||||
else:
|
||||
self.log(f"Channel {n} not active", 4)
|
||||
except ValueError:
|
||||
self.log("fx <ch> <param> <value>", 4)
|
||||
elif verb == "fx" and len(parts) <= 1:
|
||||
self.log("Volume/Mix:", 3)
|
||||
self.log(" volume pan reverb delay", 2)
|
||||
self.log("Filter:", 3)
|
||||
self.log(" lowpass lowpass_q", 2)
|
||||
self.log("Modulation:", 3)
|
||||
self.log(" chorus detune spread tremolo_depth", 2)
|
||||
self.log(" phaser analog", 2)
|
||||
self.log("Drive:", 3)
|
||||
self.log(" distortion saturation", 2)
|
||||
self.log("Synth:", 3)
|
||||
self.log(" sub_osc noise_mix", 2)
|
||||
self.log("", 0)
|
||||
self.log("fx <ch> <param> <value>", 2)
|
||||
elif verb == "drums" and len(parts) == 1:
|
||||
self.log(f"Current: {self.current_drum}", 5)
|
||||
for i in range(0, len(self.drum_patterns), 4):
|
||||
row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4])
|
||||
self.log(f" {row}", 2)
|
||||
elif verb == "drums" and len(parts) >= 2:
|
||||
pat = " ".join(parts[1:])
|
||||
if pat in ("none", "off", "mute", "-"):
|
||||
self.current_drum = "none"
|
||||
self.engine._drum_pattern = None
|
||||
self.engine._drum_channel = None
|
||||
self.log("Drums off", 4)
|
||||
else:
|
||||
try:
|
||||
self.current_drum = pat
|
||||
self.engine.drums(pat, volume=0.5)
|
||||
self.log(f"Drums → {pat}", 5)
|
||||
except Exception as e:
|
||||
self.log(f"Error: {e}", 4)
|
||||
elif verb == "seed" and len(parts) == 1:
|
||||
self.log(f"Seed: {self.seed}", 2)
|
||||
elif verb == "seed" and len(parts) >= 2:
|
||||
try:
|
||||
self.seed = int(parts[1])
|
||||
rng = random.Random(self.seed)
|
||||
self.picks = rng.sample(self.instruments, 8)
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
self.engine.channel(i, instrument=inst, reverb=0.3)
|
||||
self.log(f"Seed → {self.seed}", 1)
|
||||
except ValueError:
|
||||
self.log("seed <number>", 4)
|
||||
elif verb == "list":
|
||||
for i in range(0, len(self.instruments), 3):
|
||||
row = " ".join(f"{x:22s}" for x in self.instruments[i:i+3])
|
||||
self.log(f" {row}", 2)
|
||||
elif verb == "patterns":
|
||||
for i in range(0, len(self.drum_patterns), 4):
|
||||
row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4])
|
||||
self.log(f" {row}", 2)
|
||||
elif verb == "pan" and len(parts) >= 3:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
val = float(parts[2])
|
||||
val = max(-1.0, min(1.0, val))
|
||||
if n in self.engine.channels:
|
||||
self.engine.channels[n].pan = val
|
||||
self.log(f"Ch {n} pan={val:+.1f}", 1)
|
||||
else:
|
||||
self.log(f"Channel {n} not active", 4)
|
||||
except ValueError:
|
||||
self.log("pan <1-8> <-1..1>", 4)
|
||||
elif verb == "kbd":
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
ch_num = int(parts[1])
|
||||
self.engine._keyboard_channel = ch_num
|
||||
if len(parts) >= 3:
|
||||
self.engine._keyboard_octave = int(parts[2])
|
||||
except ValueError:
|
||||
self.log("kbd <channel> [octave]", 4)
|
||||
return
|
||||
else:
|
||||
self.engine._keyboard_channel = self.engine._keyboard_channel or 1
|
||||
self.kbd_active = True
|
||||
# Make sure wavetables are cached for this channel
|
||||
ch_num = self.engine._keyboard_channel
|
||||
if ch_num in self.engine.channels:
|
||||
channel = self.engine.channels[ch_num]
|
||||
if not channel._cache:
|
||||
self.log("Rendering wavetables...", 3)
|
||||
for midi_note in range(36, 97):
|
||||
channel._get_wave(midi_note, 44100 * 3)
|
||||
self.log(f"♪ Keyboard ON ch{ch_num} oct{self.engine._keyboard_octave} (Esc=exit, ↑↓=octave)", 1)
|
||||
elif verb == "rec":
|
||||
self.engine.start_recording()
|
||||
self.log("● Recording...", 4)
|
||||
elif verb == "stop" and self.engine._recording:
|
||||
self.engine.stop_recording()
|
||||
n = len(self.engine._record_events)
|
||||
self.log(f"■ Stopped ({n} events)", 3)
|
||||
elif verb == "export":
|
||||
fname = parts[1] if len(parts) > 1 else "recording.mid"
|
||||
score = self.engine.export_recording(fname)
|
||||
if score:
|
||||
self.log(f"Exported → {fname}", 1)
|
||||
else:
|
||||
self.log("Nothing to export", 4)
|
||||
elif verb == "save" and len(parts) >= 2:
|
||||
fname = parts[1]
|
||||
if not fname.endswith(".json"):
|
||||
fname += ".json"
|
||||
self.engine.seed = self.seed
|
||||
self.engine.save_config(fname)
|
||||
self.log(f"Saved → {fname}", 1)
|
||||
elif verb == "load" and len(parts) >= 2:
|
||||
fname = parts[1]
|
||||
if not fname.endswith(".json"):
|
||||
fname += ".json"
|
||||
try:
|
||||
self.engine.load_config(fname)
|
||||
self.log(f"Loaded ← {fname}", 1)
|
||||
# Update picks from channels
|
||||
for ch, channel in self.engine.channels.items():
|
||||
if 1 <= ch <= 8:
|
||||
self.picks[ch - 1] = channel.synth_name
|
||||
except Exception as e:
|
||||
self.log(f"Error: {e}", 4)
|
||||
elif verb == "octave" and len(parts) >= 2:
|
||||
try:
|
||||
self.engine._keyboard_octave = int(parts[1])
|
||||
self.log(f"Keyboard octave → {self.engine._keyboard_octave}", 2)
|
||||
except ValueError:
|
||||
self.log("octave <0-8>", 4)
|
||||
else:
|
||||
self.log(f"? {cmd}", 4)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="PyTheory Live — real-time MIDI synthesizer")
|
||||
parser.add_argument("seed", nargs="?", type=int, default=None, help="Random seed for instruments")
|
||||
parser.add_argument("--port", "-p", default="OP-XY", help="MIDI port name (default: OP-XY)")
|
||||
parser.add_argument("--channels", "-c", type=int, default=8, help="Number of channels (default: 8)")
|
||||
parser.add_argument("--drums", "-d", default="rock", help="Drum pattern (default: rock, 'none' to disable)")
|
||||
parser.add_argument("--buffer", "-b", type=int, default=128, help="Audio buffer size (default: 128)")
|
||||
args = parser.parse_args()
|
||||
|
||||
tui = LiveTUI(seed=args.seed, port=args.port, n_channels=args.channels,
|
||||
drum_pattern=args.drums, buffer_size=args.buffer)
|
||||
curses.wrapper(tui.run)
|
||||
|
||||
# Print resume command on exit
|
||||
cmd_parts = ["pytheory-live", str(tui.seed)]
|
||||
if tui.port != "OP-XY":
|
||||
cmd_parts += ["--port", tui.port]
|
||||
if tui.n_channels != 8:
|
||||
cmd_parts += ["--channels", str(tui.n_channels)]
|
||||
if tui.current_drum != "rock":
|
||||
cmd_parts += ["--drums", tui.current_drum]
|
||||
if tui.buffer_size != 128:
|
||||
cmd_parts += ["--buffer", str(tui.buffer_size)]
|
||||
print(f"\nResume this session with:\n {' '.join(cmd_parts)}\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+893
-101
File diff suppressed because it is too large
Load Diff
+43
-13
@@ -23,6 +23,15 @@ INSTRUMENTS = {
|
||||
"tremolo_depth": 0.12, "tremolo_rate": 4.5,
|
||||
"analog": 0.15,
|
||||
},
|
||||
"wurlitzer": {
|
||||
"synth": "wurlitzer_synth", "envelope": "none",
|
||||
"tremolo_depth": 0.18, "tremolo_rate": 5.0,
|
||||
"analog": 0.2,
|
||||
},
|
||||
"pipe_organ": {
|
||||
"synth": "pipe_organ_synth", "envelope": "none",
|
||||
"reverb": 0.5, "reverb_type": "cathedral",
|
||||
},
|
||||
"organ": {
|
||||
"synth": "organ_synth", "envelope": "organ",
|
||||
"chorus": 0.2, "chorus_rate": 5.5,
|
||||
@@ -256,6 +265,26 @@ INSTRUMENTS = {
|
||||
"lowpass": 4500,
|
||||
"humanize": 0.2,
|
||||
},
|
||||
"crotales": {
|
||||
"synth": "crotales_synth", "envelope": "none",
|
||||
"reverb": 0.3,
|
||||
"humanize": 0.2,
|
||||
},
|
||||
"tingsha": {
|
||||
"synth": "tingsha_synth", "envelope": "none",
|
||||
"reverb": 0.4,
|
||||
"humanize": 0.2,
|
||||
},
|
||||
"singing_bowl": {
|
||||
"synth": "singing_bowl_strike_synth", "envelope": "none",
|
||||
"reverb": 0.5,
|
||||
"humanize": 0.2,
|
||||
},
|
||||
"singing_bowl_ring": {
|
||||
"synth": "singing_bowl_ring_synth", "envelope": "none",
|
||||
"reverb": 0.5,
|
||||
"humanize": 0.2,
|
||||
},
|
||||
|
||||
# ── Synth presets ──
|
||||
"synth_lead": {
|
||||
@@ -303,8 +332,8 @@ INSTRUMENTS = {
|
||||
"humanize": 0.15,
|
||||
},
|
||||
"choir": {
|
||||
"synth": "vocal_synth", "envelope": "pad",
|
||||
"detune": 8, "spread": 0.4,
|
||||
"synth": "choir_synth", "envelope": "none",
|
||||
"detune": 6, "spread": 0.3, "ensemble": 6,
|
||||
"reverb": 0.45, "reverb_type": "cathedral",
|
||||
},
|
||||
"granular_texture": {
|
||||
@@ -321,10 +350,7 @@ INSTRUMENTS = {
|
||||
|
||||
# ── Percussion / Mallet ──
|
||||
"vibraphone": {
|
||||
"synth": "fm", "envelope": "mallet",
|
||||
"fm_ratio": 1.0, "fm_index": 1.0,
|
||||
"lowpass": 5000,
|
||||
"tremolo_depth": 0.3, "tremolo_rate": 5.5,
|
||||
"synth": "vibraphone_synth", "envelope": "none",
|
||||
"reverb": 0.3, "reverb_type": "plate",
|
||||
},
|
||||
"marimba": {
|
||||
@@ -354,22 +380,19 @@ INSTRUMENTS = {
|
||||
# ── Woodwinds (continued) ──
|
||||
"saxophone": {
|
||||
"synth": "saxophone_synth", "envelope": "bowed",
|
||||
"humanize": 0.15, "vel_to_filter": 1500,
|
||||
"humanize": 0.15,
|
||||
},
|
||||
"alto_sax": {
|
||||
"synth": "saxophone_synth", "envelope": "bowed",
|
||||
"humanize": 0.15, "vel_to_filter": 1800,
|
||||
"humanize": 0.15,
|
||||
},
|
||||
"tenor_sax": {
|
||||
"synth": "saxophone_synth", "envelope": "bowed",
|
||||
"lowpass": 3000,
|
||||
"humanize": 0.15, "vel_to_filter": 1200,
|
||||
"humanize": 0.15,
|
||||
},
|
||||
"bari_sax": {
|
||||
"synth": "saxophone_synth", "envelope": "bowed",
|
||||
"lowpass": 2000,
|
||||
"humanize": 0.15, "vel_to_filter": 800,
|
||||
"sub_osc": 0.15,
|
||||
"humanize": 0.15, "sub_osc": 0.15,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -579,6 +602,13 @@ class DrumSound(Enum):
|
||||
BASS_3 = 126 # middle
|
||||
BASS_4 = 127 # fourth
|
||||
BASS_5 = 80 # lowest (biggest) bass drum
|
||||
# Effects / world percussion
|
||||
RAINSTICK = 81 # cascading pebbles through cactus tube (steep angle)
|
||||
RAINSTICK_SLOW = 128 # gentle trickle (shallow angle)
|
||||
OCEAN_DRUM = 82 # tilting drum with steel beads — surf wash
|
||||
CABASA = 83 # metal bead chain wrapped around cylinder
|
||||
WIND_CHIMES = 84 # suspended metal tubes struck by wind/hand
|
||||
FINGER_CYMBAL = 85 # single small cymbal tap (zill)
|
||||
|
||||
|
||||
class _DrumTone:
|
||||
|
||||
+730
@@ -0,0 +1,730 @@
|
||||
"""PyTheory Live — interactive MIDI synthesizer with TUI."""
|
||||
|
||||
import curses
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
|
||||
from pytheory.live import LiveEngine
|
||||
from pytheory.rhythm import INSTRUMENTS, Pattern
|
||||
|
||||
|
||||
NOTE_NAMES = ['C', 'C#', 'D', 'D#', 'E', 'F', 'F#', 'G', 'G#', 'A', 'A#', 'B']
|
||||
|
||||
|
||||
def note_name(midi):
|
||||
return f"{NOTE_NAMES[midi % 12]}{midi // 12 - 1}"
|
||||
|
||||
|
||||
class LiveTUI:
|
||||
def __init__(self, seed=None, port="OP-XY", n_channels=8,
|
||||
drum_pattern="rock", buffer_size=128):
|
||||
self.seed = seed or random.randint(0, 9999)
|
||||
self.port = port
|
||||
self.n_channels = n_channels
|
||||
self.buffer_size = buffer_size
|
||||
self.engine = None
|
||||
self.log_lines = []
|
||||
self.max_log = 500
|
||||
self.running = True
|
||||
self.instruments = sorted([k for k in INSTRUMENTS.keys()
|
||||
if k not in ("808_bass",)])
|
||||
self.drum_patterns = sorted(Pattern.list_presets())
|
||||
self.current_drum = drum_pattern
|
||||
self.picks = []
|
||||
self.bpm = "—"
|
||||
self.status = "Init"
|
||||
self.status_color = 3
|
||||
self._build_engine()
|
||||
|
||||
def _build_engine(self):
|
||||
rng = random.Random(self.seed)
|
||||
self.picks = rng.sample(self.instruments, min(self.n_channels, len(self.instruments)))
|
||||
self.engine = LiveEngine(buffer_size=self.buffer_size)
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
self.engine.channel(i, instrument=inst, reverb=0.3)
|
||||
if self.current_drum and self.current_drum not in ("none", "-"):
|
||||
self.engine.drums(self.current_drum, volume=0.5)
|
||||
self.engine.cc(0, "lowpass", min_val=300, max_val=8000)
|
||||
|
||||
def log(self, msg, color=0):
|
||||
self.log_lines.append((time.time(), msg, color))
|
||||
if len(self.log_lines) > self.max_log:
|
||||
self.log_lines = self.log_lines[-self.max_log:]
|
||||
|
||||
def _patch_engine_logging(self):
|
||||
original_cb = self.engine._midi_callback
|
||||
|
||||
def logging_cb(event, data=None):
|
||||
msg, _ = event
|
||||
if len(msg) == 0:
|
||||
return
|
||||
if msg[0] == 0xF8:
|
||||
if self.engine._bpm > 10:
|
||||
self.bpm = f"{self.engine._bpm:.0f}"
|
||||
elif msg[0] == 0xFA:
|
||||
self.log("▶ Start", 5)
|
||||
self.status = "Playing"
|
||||
self.status_color = 1
|
||||
elif msg[0] == 0xFC:
|
||||
self.log("■ Stop", 4)
|
||||
self.status = "Stopped"
|
||||
self.status_color = 4
|
||||
elif msg[0] == 0xFB:
|
||||
self.log("▶ Continue", 5)
|
||||
self.status = "Playing"
|
||||
self.status_color = 1
|
||||
elif len(msg) >= 3:
|
||||
status = msg[0]
|
||||
ch = (status & 0x0F) + 1
|
||||
msg_type = status & 0xF0
|
||||
if msg_type == 0x90 and msg[2] > 0:
|
||||
inst = self.picks[ch - 1] if 1 <= ch <= 8 else "?"
|
||||
self.log(f"♪ {ch}:{inst} {note_name(msg[1])} v={msg[2]}", 1)
|
||||
elif msg_type == 0xB0:
|
||||
self.log(f"⚙ CC{msg[1]}={msg[2]}", 3)
|
||||
elif msg_type == 0xE0:
|
||||
bend = ((msg[2] << 7) | msg[1]) - 8192
|
||||
self.log(f"↕ Bend ch{ch} {bend:+d}", 3)
|
||||
original_cb(event, data)
|
||||
|
||||
self.engine._midi_callback = logging_cb
|
||||
|
||||
def run(self, stdscr):
|
||||
curses.curs_set(0)
|
||||
curses.use_default_colors()
|
||||
curses.init_pair(1, curses.COLOR_GREEN, -1)
|
||||
curses.init_pair(2, curses.COLOR_CYAN, -1)
|
||||
curses.init_pair(3, curses.COLOR_YELLOW, -1)
|
||||
curses.init_pair(4, curses.COLOR_RED, -1)
|
||||
curses.init_pair(5, curses.COLOR_MAGENTA, -1)
|
||||
curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_BLUE)
|
||||
curses.init_pair(7, curses.COLOR_BLACK, curses.COLOR_GREEN)
|
||||
curses.init_pair(8, curses.COLOR_BLACK, curses.COLOR_YELLOW)
|
||||
curses.init_pair(9, curses.COLOR_BLACK, curses.COLOR_RED)
|
||||
stdscr.nodelay(True)
|
||||
stdscr.timeout(60)
|
||||
|
||||
self._patch_engine_logging()
|
||||
|
||||
# Pre-render with progress
|
||||
self.status = "Rendering"
|
||||
self.status_color = 3
|
||||
stdscr.erase()
|
||||
stdscr.addstr(1, 2, "PyTheory Live", curses.A_BOLD)
|
||||
stdscr.addstr(2, 2, "Pre-rendering wavetables...", curses.color_pair(3))
|
||||
stdscr.refresh()
|
||||
|
||||
n_samples = 44100 * 3
|
||||
count = 0
|
||||
for _, channel in self.engine.channels.items():
|
||||
if channel.is_drums:
|
||||
continue
|
||||
for midi_note in range(36, 97):
|
||||
channel._get_wave(midi_note, n_samples)
|
||||
count += 1
|
||||
if count % 50 == 0:
|
||||
stdscr.addstr(3, 2, f" {count} wavetables...", curses.color_pair(2))
|
||||
stdscr.refresh()
|
||||
|
||||
self.log(f"Cached {count} wavetables", 1)
|
||||
self.log("Starting engine...", 3)
|
||||
self.status = "Starting"
|
||||
self.status_color = 3
|
||||
|
||||
# Start engine
|
||||
def run_engine():
|
||||
import io
|
||||
capture = io.StringIO()
|
||||
old = sys.stdout
|
||||
sys.stdout = capture
|
||||
try:
|
||||
self.engine.start(port=self.port)
|
||||
except Exception as e:
|
||||
self.log(f"Engine error: {e}", 4)
|
||||
finally:
|
||||
sys.stdout = old
|
||||
output = capture.getvalue()
|
||||
if output.strip():
|
||||
for line in output.strip().split('\n'):
|
||||
self.log(line.strip(), 2)
|
||||
|
||||
engine_thread = threading.Thread(target=run_engine, daemon=True)
|
||||
engine_thread.start()
|
||||
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
cmd_history = []
|
||||
history_idx = -1
|
||||
tab_matches = []
|
||||
tab_idx = -1
|
||||
tab_prefix = ""
|
||||
self.kbd_active = False
|
||||
self._kbd_held = {} # key → last_press_time
|
||||
|
||||
while self.running:
|
||||
try:
|
||||
# Update status from engine state
|
||||
if (self.engine._stream and self.engine._stream.active
|
||||
and self.status == "Starting"):
|
||||
self.status = "Listening"
|
||||
self.status_color = 2
|
||||
self.log("Audio stream active", 1)
|
||||
|
||||
h, w = stdscr.getmaxyx()
|
||||
if h < 10 or w < 40:
|
||||
time.sleep(0.1)
|
||||
continue
|
||||
stdscr.erase()
|
||||
|
||||
div = max(30, w * 3 // 5)
|
||||
cfg_x = div + 2
|
||||
|
||||
# ═══ HEADER BAR ═══
|
||||
header = f" PyTheory Live "
|
||||
stdscr.addstr(0, 0, header, curses.color_pair(6) | curses.A_BOLD)
|
||||
|
||||
# Status badge
|
||||
badge_colors = {1: 7, 2: 6, 3: 8, 4: 9}
|
||||
badge_cp = badge_colors.get(self.status_color, 6)
|
||||
badge = f" {self.status} "
|
||||
x = len(header)
|
||||
stdscr.addstr(0, x, badge, curses.color_pair(badge_cp) | curses.A_BOLD)
|
||||
x += len(badge)
|
||||
|
||||
kbd_mode = ""
|
||||
if self.engine._keyboard_channel:
|
||||
kbd_mode = f" KBD:ch{self.engine._keyboard_channel}"
|
||||
rec_mode = " ●REC" if self.engine._recording else ""
|
||||
info = f" BPM:{self.bpm} drums:{self.current_drum} seed:{self.seed}{kbd_mode}{rec_mode}"
|
||||
try:
|
||||
stdscr.addstr(0, x, info[:w - x], curses.color_pair(6))
|
||||
# Fill rest of header
|
||||
remaining = w - x - len(info)
|
||||
if remaining > 0:
|
||||
stdscr.addstr(0, x + len(info), " " * remaining, curses.color_pair(6))
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ DIVIDER ═══
|
||||
for y in range(1, h - 2):
|
||||
try:
|
||||
stdscr.addch(y, div, '│', curses.color_pair(2) | curses.A_DIM)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ LEFT: EVENTS ═══
|
||||
try:
|
||||
stdscr.addstr(1, 1, " Events ", curses.color_pair(2) | curses.A_BOLD)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
log_h = h - 5
|
||||
visible = self.log_lines[-log_h:] if log_h > 0 else []
|
||||
for i, (ts, msg, color) in enumerate(visible):
|
||||
ly = 2 + i
|
||||
if ly >= h - 3:
|
||||
break
|
||||
# Fade old messages
|
||||
age = time.time() - ts
|
||||
attr = curses.A_DIM if age > 8 else 0
|
||||
try:
|
||||
stdscr.addstr(ly, 1, msg[:div - 2],
|
||||
curses.color_pair(color) | attr)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ RIGHT: CONFIG ═══
|
||||
try:
|
||||
stdscr.addstr(1, cfg_x, " Config ",
|
||||
curses.color_pair(2) | curses.A_BOLD)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
y = 3
|
||||
rw = w - cfg_x - 1
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD)
|
||||
stdscr.addstr(y, cfg_x + 1, ":", curses.A_DIM)
|
||||
stdscr.addstr(y, cfg_x + 2, inst[:rw - 2],
|
||||
curses.color_pair(1))
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
|
||||
y += 1
|
||||
# VU meters per channel
|
||||
y += 1
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, "Levels:", curses.A_BOLD | curses.A_DIM)
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
if i in self.engine.channels:
|
||||
lv = self.engine.channels[i].level
|
||||
bars = int(min(lv, 1.0) * 16)
|
||||
meter = "|" * bars + "-" * (16 - bars)
|
||||
color = 1 if bars < 15 else 3 if bars < 18 else 4
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, f"{i}", curses.A_BOLD)
|
||||
stdscr.addstr(y, cfg_x + 1, f" {meter}", curses.color_pair(color))
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
|
||||
y += 1
|
||||
rec_indicator = " ● REC " if self.engine._recording else ""
|
||||
kbd_indicator = f" kbd:ch{self.engine._keyboard_channel} oct{self.engine._keyboard_octave}" if self.engine._keyboard_channel else ""
|
||||
|
||||
pairs = [
|
||||
("Drums", self.current_drum, 5),
|
||||
("BPM", str(self.bpm), 0),
|
||||
("Latency", f"{self.engine.buffer_size / 44100 * 1000:.1f}ms", 0),
|
||||
("MIDI", self.port, 0),
|
||||
("Seed", str(self.seed), 2),
|
||||
]
|
||||
if rec_indicator:
|
||||
pairs.append(("", rec_indicator, 4))
|
||||
if kbd_indicator:
|
||||
pairs.append(("", kbd_indicator, 2))
|
||||
for label, val, cp in pairs:
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, f"{label}:", curses.A_BOLD)
|
||||
stdscr.addstr(y, cfg_x + len(label) + 1, f" {val}"[:rw],
|
||||
curses.color_pair(cp))
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
|
||||
y += 1
|
||||
cmds = [
|
||||
"ch <n> [inst]",
|
||||
"fx <n> <param> <val>",
|
||||
"pan <n> <-1..1>",
|
||||
"drums [pattern|-]",
|
||||
"kbd [ch] [oct]",
|
||||
"rec / stop / export",
|
||||
"save/load <file>",
|
||||
"seed [n] / list",
|
||||
"exit",
|
||||
]
|
||||
try:
|
||||
stdscr.addstr(y, cfg_x, "Commands:", curses.color_pair(3))
|
||||
except curses.error:
|
||||
pass
|
||||
for i, c in enumerate(cmds):
|
||||
try:
|
||||
stdscr.addstr(y + 1 + i, cfg_x + 1, c[:rw],
|
||||
curses.color_pair(2) | curses.A_DIM)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
# ═══ INPUT BAR ═══
|
||||
iy = h - 2
|
||||
try:
|
||||
stdscr.addstr(iy - 1, 0, "─" * (w - 1), curses.A_DIM)
|
||||
if self.kbd_active:
|
||||
stdscr.addstr(iy, 0, " KEYBOARD ",
|
||||
curses.color_pair(7) | curses.A_BOLD)
|
||||
stdscr.addstr(iy, 10, " Esc=exit Up/Down=octave ",
|
||||
curses.color_pair(3))
|
||||
else:
|
||||
stdscr.addstr(iy, 0, " $ ",
|
||||
curses.color_pair(1) | curses.A_BOLD)
|
||||
stdscr.addstr(iy, 3, cmd_buf[:w - 5])
|
||||
# Cursor at position
|
||||
cx = 3 + cursor_pos
|
||||
if cx < w - 1:
|
||||
# Show character under cursor inverted, or block at end
|
||||
if cursor_pos < len(cmd_buf):
|
||||
stdscr.addstr(iy, cx, cmd_buf[cursor_pos],
|
||||
curses.A_REVERSE)
|
||||
else:
|
||||
stdscr.addstr(iy, cx, " ", curses.A_REVERSE)
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
stdscr.refresh()
|
||||
|
||||
# ═══ INPUT ═══
|
||||
ch = stdscr.getch()
|
||||
if ch == -1:
|
||||
continue
|
||||
|
||||
# KEYBOARD MODE: all keys go to MIDI
|
||||
if self.kbd_active:
|
||||
if ch == 27: # Escape exits keyboard mode
|
||||
# Release all held notes
|
||||
for k in list(self._kbd_held):
|
||||
self.engine.keyboard_note(k, on=False)
|
||||
self._kbd_held.clear()
|
||||
self.kbd_active = False
|
||||
self.engine._keyboard_channel = None
|
||||
self.log("Keyboard off (Esc)", 3)
|
||||
elif ch == curses.KEY_UP:
|
||||
self.engine._keyboard_octave = min(8, self.engine._keyboard_octave + 1)
|
||||
self.log(f"Octave ↑ {self.engine._keyboard_octave}", 2)
|
||||
elif ch == curses.KEY_DOWN:
|
||||
self.engine._keyboard_octave = max(0, self.engine._keyboard_octave - 1)
|
||||
self.log(f"Octave ↓ {self.engine._keyboard_octave}", 2)
|
||||
elif 32 <= ch < 127:
|
||||
key = chr(ch).lower()
|
||||
now = time.time()
|
||||
if key in self._kbd_held:
|
||||
# Key repeat — just refresh the timer
|
||||
self._kbd_held[key] = now
|
||||
else:
|
||||
# New key press
|
||||
played = self.engine.keyboard_note(key, on=True)
|
||||
if played:
|
||||
self._kbd_held[key] = now
|
||||
|
||||
# Release keys that haven't been pressed for 150ms
|
||||
now = time.time()
|
||||
expired = [k for k, t in self._kbd_held.items()
|
||||
if now - t > 0.15]
|
||||
for k in expired:
|
||||
self.engine.keyboard_note(k, on=False)
|
||||
del self._kbd_held[k]
|
||||
|
||||
continue
|
||||
|
||||
if ch == 10 or ch == 13:
|
||||
if cmd_buf.strip():
|
||||
cmd_history.append(cmd_buf)
|
||||
self._handle_command(cmd_buf.strip())
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
history_idx = -1
|
||||
elif ch == 27:
|
||||
if self.engine._keyboard_channel and not cmd_buf:
|
||||
# Escape exits keyboard mode
|
||||
self.engine._keyboard_channel = None
|
||||
self.log("Keyboard off (Esc)", 3)
|
||||
else:
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
elif ch == curses.KEY_BACKSPACE or ch == 127:
|
||||
if cursor_pos > 0:
|
||||
cmd_buf = cmd_buf[:cursor_pos - 1] + cmd_buf[cursor_pos:]
|
||||
cursor_pos -= 1
|
||||
elif ch == curses.KEY_LEFT:
|
||||
cursor_pos = max(0, cursor_pos - 1)
|
||||
elif ch == curses.KEY_RIGHT:
|
||||
cursor_pos = min(len(cmd_buf), cursor_pos + 1)
|
||||
elif ch == curses.KEY_HOME or ch == 1: # Ctrl-A
|
||||
cursor_pos = 0
|
||||
elif ch == curses.KEY_END or ch == 5: # Ctrl-E
|
||||
cursor_pos = len(cmd_buf)
|
||||
elif ch == curses.KEY_UP:
|
||||
if cmd_history and history_idx < len(cmd_history) - 1:
|
||||
history_idx += 1
|
||||
cmd_buf = cmd_history[-(history_idx + 1)]
|
||||
cursor_pos = len(cmd_buf)
|
||||
elif ch == curses.KEY_DOWN:
|
||||
if history_idx > 0:
|
||||
history_idx -= 1
|
||||
cmd_buf = cmd_history[-(history_idx + 1)]
|
||||
cursor_pos = len(cmd_buf)
|
||||
else:
|
||||
history_idx = -1
|
||||
cmd_buf = ""
|
||||
cursor_pos = 0
|
||||
elif ch == 9: # Tab
|
||||
if tab_matches and tab_prefix == cmd_buf:
|
||||
tab_idx = (tab_idx + 1) % len(tab_matches)
|
||||
cmd_buf = tab_matches[tab_idx]
|
||||
else:
|
||||
tab_matches = self._complete(cmd_buf)
|
||||
tab_prefix = cmd_buf
|
||||
if len(tab_matches) == 1:
|
||||
cmd_buf = tab_matches[0]
|
||||
tab_matches = []
|
||||
elif tab_matches:
|
||||
tab_idx = 0
|
||||
cmd_buf = tab_matches[0]
|
||||
else:
|
||||
tab_matches = []
|
||||
cursor_pos = len(cmd_buf)
|
||||
elif 32 <= ch < 127:
|
||||
cmd_buf = cmd_buf[:cursor_pos] + chr(ch) + cmd_buf[cursor_pos:]
|
||||
cursor_pos += 1
|
||||
tab_matches = []
|
||||
tab_idx = -1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
self.running = False
|
||||
|
||||
self.engine.stop()
|
||||
|
||||
def _complete(self, text):
|
||||
"""Return list of completions for current input."""
|
||||
parts = text.split()
|
||||
commands = ["ch", "fx", "pan", "drums", "kbd", "rec", "stop", "export",
|
||||
"save", "load", "seed", "list", "patterns", "octave", "help", "exit"]
|
||||
fx_params = ["volume", "pan", "lowpass", "lowpass_q", "reverb", "chorus",
|
||||
"detune", "spread", "analog", "distortion", "delay",
|
||||
"tremolo_depth", "saturation", "phaser", "sub_osc", "noise_mix"]
|
||||
|
||||
if not parts:
|
||||
return [c + " " for c in commands]
|
||||
|
||||
# Completing first word
|
||||
if len(parts) == 1 and not text.endswith(" "):
|
||||
prefix = parts[0].lower()
|
||||
return [c + " " for c in commands if c.startswith(prefix)]
|
||||
|
||||
verb = parts[0].lower()
|
||||
|
||||
# ch <n> <instrument>
|
||||
if verb == "ch" and len(parts) == 3 and not text.endswith(" "):
|
||||
prefix = parts[2].lower()
|
||||
return [f"ch {parts[1]} {i} " for i in self.instruments
|
||||
if i.startswith(prefix)]
|
||||
if verb == "ch" and len(parts) == 2 and text.endswith(" "):
|
||||
return [f"ch {parts[1]} {i} " for i in self.instruments]
|
||||
|
||||
# drums <pattern>
|
||||
if verb == "drums" and len(parts) == 2 and not text.endswith(" "):
|
||||
prefix = parts[1].lower()
|
||||
matches = [p for p in self.drum_patterns if p.startswith(prefix)]
|
||||
return [f"drums {m} " for m in matches]
|
||||
if verb == "drums" and len(parts) == 1 and text.endswith(" "):
|
||||
return [f"drums {p} " for p in self.drum_patterns]
|
||||
|
||||
# fx <n> <param> <val>
|
||||
if verb == "fx" and len(parts) == 3 and not text.endswith(" "):
|
||||
prefix = parts[2].lower()
|
||||
return [f"fx {parts[1]} {p} " for p in fx_params
|
||||
if p.startswith(prefix)]
|
||||
if verb == "fx" and len(parts) == 2 and text.endswith(" "):
|
||||
return [f"fx {parts[1]} {p} " for p in fx_params]
|
||||
|
||||
return []
|
||||
|
||||
def _handle_command(self, cmd):
|
||||
parts = cmd.split()
|
||||
if not parts:
|
||||
return
|
||||
verb = parts[0].lower()
|
||||
|
||||
if verb in ("quit", "q", "exit"):
|
||||
self.running = False
|
||||
elif verb in ("help", "h"):
|
||||
self.log("ch <n> [inst] | fx <n> <param> <val> | drums [pat|-]", 2)
|
||||
self.log("seed [n] | list | patterns | exit", 2)
|
||||
elif verb == "ch" and len(parts) == 2:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
if 1 <= n <= len(self.picks):
|
||||
self.log(f"Ch {n}: {self.picks[n-1]}", 2)
|
||||
else:
|
||||
self.log(f"Channel 1-{len(self.picks)}", 4)
|
||||
except ValueError:
|
||||
self.log("ch <1-8> [instrument]", 4)
|
||||
elif verb == "ch" and len(parts) >= 3:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
inst = parts[2]
|
||||
if inst not in INSTRUMENTS:
|
||||
self.log(f"Unknown: {inst}", 4)
|
||||
return
|
||||
if not (1 <= n <= len(self.picks)):
|
||||
self.log(f"Channel 1-{len(self.picks)}", 4)
|
||||
return
|
||||
self.picks[n - 1] = inst
|
||||
self.engine.channel(n, instrument=inst, reverb=0.3)
|
||||
self.log(f"Ch {n} → {inst}", 1)
|
||||
except (ValueError, IndexError):
|
||||
self.log("ch <1-8> <instrument>", 4)
|
||||
elif verb == "fx" and len(parts) >= 4:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
param = parts[2]
|
||||
val = float(parts[3])
|
||||
if not (1 <= n <= len(self.picks)):
|
||||
self.log(f"Channel 1-{len(self.picks)}", 4)
|
||||
return
|
||||
if n not in self.engine.channels:
|
||||
self.log(f"Channel {n} not active", 4)
|
||||
return
|
||||
channel = self.engine.channels[n]
|
||||
if param == "reverb":
|
||||
channel.reverb = val
|
||||
channel._cache.clear()
|
||||
elif param == "lowpass":
|
||||
channel.lowpass = val
|
||||
channel._cache.clear()
|
||||
elif param == "volume":
|
||||
channel.volume = val
|
||||
elif hasattr(channel, param):
|
||||
setattr(channel, param, val)
|
||||
channel._cache.clear()
|
||||
else:
|
||||
self.log(f"Unknown param: {param}", 4)
|
||||
return
|
||||
self.log(f"Ch {n} {param}={val}", 1)
|
||||
except (ValueError, IndexError):
|
||||
self.log("fx <1-8> <param> <value>", 4)
|
||||
elif verb == "fx" and len(parts) == 2:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
if n in self.engine.channels:
|
||||
ch = self.engine.channels[n]
|
||||
self.log(f"Ch {n}: vol={ch.volume} lp={ch.lowpass} rev={ch.reverb}", 2)
|
||||
else:
|
||||
self.log(f"Channel {n} not active", 4)
|
||||
except ValueError:
|
||||
self.log("fx <ch> <param> <value>", 4)
|
||||
elif verb == "fx" and len(parts) <= 1:
|
||||
self.log("Volume/Mix:", 3)
|
||||
self.log(" volume pan reverb delay", 2)
|
||||
self.log("Filter:", 3)
|
||||
self.log(" lowpass lowpass_q", 2)
|
||||
self.log("Modulation:", 3)
|
||||
self.log(" chorus detune spread tremolo_depth", 2)
|
||||
self.log(" phaser analog", 2)
|
||||
self.log("Drive:", 3)
|
||||
self.log(" distortion saturation", 2)
|
||||
self.log("Synth:", 3)
|
||||
self.log(" sub_osc noise_mix", 2)
|
||||
self.log("", 0)
|
||||
self.log("fx <ch> <param> <value>", 2)
|
||||
elif verb == "drums" and len(parts) == 1:
|
||||
self.log(f"Current: {self.current_drum}", 5)
|
||||
for i in range(0, len(self.drum_patterns), 4):
|
||||
row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4])
|
||||
self.log(f" {row}", 2)
|
||||
elif verb == "drums" and len(parts) >= 2:
|
||||
pat = " ".join(parts[1:])
|
||||
if pat in ("none", "off", "mute", "-"):
|
||||
self.current_drum = "none"
|
||||
self.engine._drum_pattern = None
|
||||
self.engine._drum_channel = None
|
||||
self.log("Drums off", 4)
|
||||
else:
|
||||
try:
|
||||
self.current_drum = pat
|
||||
self.engine.drums(pat, volume=0.5)
|
||||
self.log(f"Drums → {pat}", 5)
|
||||
except Exception as e:
|
||||
self.log(f"Error: {e}", 4)
|
||||
elif verb == "seed" and len(parts) == 1:
|
||||
self.log(f"Seed: {self.seed}", 2)
|
||||
elif verb == "seed" and len(parts) >= 2:
|
||||
try:
|
||||
self.seed = int(parts[1])
|
||||
rng = random.Random(self.seed)
|
||||
self.picks = rng.sample(self.instruments, 8)
|
||||
for i, inst in enumerate(self.picks, 1):
|
||||
self.engine.channel(i, instrument=inst, reverb=0.3)
|
||||
self.log(f"Seed → {self.seed}", 1)
|
||||
except ValueError:
|
||||
self.log("seed <number>", 4)
|
||||
elif verb == "list":
|
||||
for i in range(0, len(self.instruments), 3):
|
||||
row = " ".join(f"{x:22s}" for x in self.instruments[i:i+3])
|
||||
self.log(f" {row}", 2)
|
||||
elif verb == "patterns":
|
||||
for i in range(0, len(self.drum_patterns), 4):
|
||||
row = " ".join(f"{x:17s}" for x in self.drum_patterns[i:i+4])
|
||||
self.log(f" {row}", 2)
|
||||
elif verb == "pan" and len(parts) >= 3:
|
||||
try:
|
||||
n = int(parts[1])
|
||||
val = float(parts[2])
|
||||
val = max(-1.0, min(1.0, val))
|
||||
if n in self.engine.channels:
|
||||
self.engine.channels[n].pan = val
|
||||
self.log(f"Ch {n} pan={val:+.1f}", 1)
|
||||
else:
|
||||
self.log(f"Channel {n} not active", 4)
|
||||
except ValueError:
|
||||
self.log("pan <1-8> <-1..1>", 4)
|
||||
elif verb == "kbd":
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
ch_num = int(parts[1])
|
||||
self.engine._keyboard_channel = ch_num
|
||||
if len(parts) >= 3:
|
||||
self.engine._keyboard_octave = int(parts[2])
|
||||
except ValueError:
|
||||
self.log("kbd <channel> [octave]", 4)
|
||||
return
|
||||
else:
|
||||
self.engine._keyboard_channel = self.engine._keyboard_channel or 1
|
||||
self.kbd_active = True
|
||||
# Make sure wavetables are cached for this channel
|
||||
ch_num = self.engine._keyboard_channel
|
||||
if ch_num in self.engine.channels:
|
||||
channel = self.engine.channels[ch_num]
|
||||
if not channel._cache:
|
||||
self.log("Rendering wavetables...", 3)
|
||||
for midi_note in range(36, 97):
|
||||
channel._get_wave(midi_note, 44100 * 3)
|
||||
self.log(f"♪ Keyboard ON ch{ch_num} oct{self.engine._keyboard_octave} (Esc=exit, ↑↓=octave)", 1)
|
||||
elif verb == "rec":
|
||||
self.engine.start_recording()
|
||||
self.log("● Recording...", 4)
|
||||
elif verb == "stop" and self.engine._recording:
|
||||
self.engine.stop_recording()
|
||||
n = len(self.engine._record_events)
|
||||
self.log(f"■ Stopped ({n} events)", 3)
|
||||
elif verb == "export":
|
||||
fname = parts[1] if len(parts) > 1 else "recording.mid"
|
||||
score = self.engine.export_recording(fname)
|
||||
if score:
|
||||
self.log(f"Exported → {fname}", 1)
|
||||
else:
|
||||
self.log("Nothing to export", 4)
|
||||
elif verb == "save" and len(parts) >= 2:
|
||||
fname = parts[1]
|
||||
if not fname.endswith(".json"):
|
||||
fname += ".json"
|
||||
self.engine.seed = self.seed
|
||||
self.engine.save_config(fname)
|
||||
self.log(f"Saved → {fname}", 1)
|
||||
elif verb == "load" and len(parts) >= 2:
|
||||
fname = parts[1]
|
||||
if not fname.endswith(".json"):
|
||||
fname += ".json"
|
||||
try:
|
||||
self.engine.load_config(fname)
|
||||
self.log(f"Loaded ← {fname}", 1)
|
||||
# Update picks from channels
|
||||
for ch, channel in self.engine.channels.items():
|
||||
if 1 <= ch <= 8:
|
||||
self.picks[ch - 1] = channel.synth_name
|
||||
except Exception as e:
|
||||
self.log(f"Error: {e}", 4)
|
||||
elif verb == "octave" and len(parts) >= 2:
|
||||
try:
|
||||
self.engine._keyboard_octave = int(parts[1])
|
||||
self.log(f"Keyboard octave → {self.engine._keyboard_octave}", 2)
|
||||
except ValueError:
|
||||
self.log("octave <0-8>", 4)
|
||||
else:
|
||||
self.log(f"? {cmd}", 4)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="PyTheory Live — real-time MIDI synthesizer")
|
||||
parser.add_argument("seed", nargs="?", type=int, default=None, help="Random seed for instruments")
|
||||
parser.add_argument("--port", "-p", default="OP-XY", help="MIDI port name (default: OP-XY)")
|
||||
parser.add_argument("--channels", "-c", type=int, default=8, help="Number of channels (default: 8)")
|
||||
parser.add_argument("--drums", "-d", default="rock", help="Drum pattern (default: rock, 'none' to disable)")
|
||||
parser.add_argument("--buffer", "-b", type=int, default=128, help="Audio buffer size (default: 128)")
|
||||
args = parser.parse_args()
|
||||
|
||||
tui = LiveTUI(seed=args.seed, port=args.port, n_channels=args.channels,
|
||||
drum_pattern=args.drums, buffer_size=args.buffer)
|
||||
curses.wrapper(tui.run)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
+2
-2
@@ -5333,7 +5333,7 @@ def test_supersaw_wave():
|
||||
@needs_portaudio
|
||||
def test_all_synths_in_enum():
|
||||
from pytheory.play import Synth
|
||||
assert len(Synth) == 42
|
||||
assert len(Synth) == 46
|
||||
for s in Synth:
|
||||
wave = s(440, n_samples=1000)
|
||||
assert len(wave) == 1000
|
||||
@@ -7155,7 +7155,7 @@ def test_score_system_propagates():
|
||||
|
||||
def test_synth_enum_count():
|
||||
from pytheory.play import Synth
|
||||
assert len(Synth) == 42
|
||||
assert len(Synth) == 46
|
||||
|
||||
|
||||
def test_all_synths_render_and_enum_match():
|
||||
|
||||
@@ -690,9 +690,10 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "pytheory"
|
||||
version = "0.40.0"
|
||||
version = "0.40.6"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "rich" },
|
||||
{ name = "scipy", version = "1.15.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },
|
||||
{ name = "scipy", version = "1.17.1", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" },
|
||||
{ name = "sounddevice" },
|
||||
@@ -712,6 +713,7 @@ docs = [
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "rich", specifier = ">=14.3.3" },
|
||||
{ name = "scipy" },
|
||||
{ name = "sounddevice" },
|
||||
]
|
||||
@@ -802,6 +804,20 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rich"
|
||||
version = "14.3.3"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "markdown-it-py", version = "3.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },
|
||||
{ name = "markdown-it-py", version = "4.0.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" },
|
||||
{ name = "pygments" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/b3/c6/f3b320c27991c46f43ee9d856302c70dc2d0fb2dba4842ff739d5f46b393/rich-14.3.3.tar.gz", hash = "sha256:b8daa0b9e4eef54dd8cf7c86c03713f53241884e814f4e2f5fb342fe520f639b", size = 230582, upload-time = "2026-02-19T17:23:12.474Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/14/25/b208c5683343959b670dc001595f2f3737e051da617f66c31f7c4fa93abc/rich-14.3.3-py3-none-any.whl", hash = "sha256:793431c1f8619afa7d3b52b2cdec859562b950ea0d4b6b505397612db8d5362d", size = 310458, upload-time = "2026-02-19T17:23:13.732Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "roman-numerals"
|
||||
version = "4.1.0"
|
||||
|
||||
Reference in New Issue
Block a user