From 736683f28e85bc2aa55dee3f4ddce30a696841f6 Mon Sep 17 00:00:00 2001 From: Scoder12 <34356756+Scoder12@users.noreply.github.com> Date: Mon, 27 Jul 2020 14:03:09 -0700 Subject: [PATCH] Add audio code --- src/replit/audio/Makefile | 18 ++ src/replit/audio/docs/Makefile | 20 ++ src/replit/audio/docs/conf.py | 59 ++++ src/replit/audio/docs/index.rst | 36 +++ src/replit/audio/docs/make.bat | 35 ++ src/replit/audio/replit/__init__.py | 473 ++++++++++++++++++++++++++++ src/replit/audio/replit/types.py | 112 +++++++ src/replit/audio/test.py | 67 ++++ 8 files changed, 820 insertions(+) create mode 100644 src/replit/audio/Makefile create mode 100644 src/replit/audio/docs/Makefile create mode 100644 src/replit/audio/docs/conf.py create mode 100644 src/replit/audio/docs/index.rst create mode 100644 src/replit/audio/docs/make.bat create mode 100644 src/replit/audio/replit/__init__.py create mode 100644 src/replit/audio/replit/types.py create mode 100644 src/replit/audio/test.py diff --git a/src/replit/audio/Makefile b/src/replit/audio/Makefile new file mode 100644 index 0000000..d730584 --- /dev/null +++ b/src/replit/audio/Makefile @@ -0,0 +1,18 @@ + + + + + +.PHONY: test, docs + + +test: + @python3 test.py + +docs: + @cd ./docs && make + +docs-%: + + @echo $(shell echo $@ | cut -c6-) + @cd ./docs && make $(shell echo $@ | cut -c6-) diff --git a/src/replit/audio/docs/Makefile b/src/replit/audio/docs/Makefile new file mode 100644 index 0000000..a9e2f44 --- /dev/null +++ b/src/replit/audio/docs/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile clean + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile clean + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/src/replit/audio/docs/conf.py b/src/replit/audio/docs/conf.py new file mode 100644 index 0000000..71f7a1c --- /dev/null +++ b/src/replit/audio/docs/conf.py @@ -0,0 +1,59 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +import os +import sys +sys.path.insert(0, os.path.abspath('../')) +sys.path.append(os.path.abspath('../')) + + +# -- Project information ----------------------------------------------------- + +project = 'replit' +copyright = '2020, repl.it' +author = 'repl.it' + +# The full version, including alpha/beta/rc tags +release = '1.2.0' + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', + 'sphinx.ext.napoleon', + 'sphinx_autodoc_typehints' +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'conf.py'] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = 'groundwork' + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] diff --git a/src/replit/audio/docs/index.rst b/src/replit/audio/docs/index.rst new file mode 100644 index 0000000..3be8fe3 --- /dev/null +++ b/src/replit/audio/docs/index.rst @@ -0,0 +1,36 @@ +.. replit documentation master file, created by + sphinx-quickstart on Mon Jun 22 18:35:18 2020. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +(python) replit api reference. +============================== + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + +Module contents +--------------- + +.. automodule:: replit + :members: + :undoc-members: + :show-inheritance: + +replit.types module +------------------- + +.. automodule:: replit.types + :members: + :undoc-members: + :show-inheritance: + + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/src/replit/audio/docs/make.bat b/src/replit/audio/docs/make.bat new file mode 100644 index 0000000..2119f51 --- /dev/null +++ b/src/replit/audio/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/src/replit/audio/replit/__init__.py b/src/replit/audio/replit/__init__.py new file mode 100644 index 0000000..fdbdba1 --- /dev/null +++ b/src/replit/audio/replit/__init__.py @@ -0,0 +1,473 @@ +import json +import time +from replit.types import ReaderType, RequestArgs, RequestData, SourceData, AudioStatus, WaveType, file_types +from typing import List +from datetime import datetime, timedelta +from os import path + + +def clear(): + 'Clear is used to clear the terminal.' + print('\033[H\033[2J', end='', flush=True) + + +class InvalidFileType(Exception): + "Exception for when a requested file's type isnt valid" + pass + + +class NoSuchSourceException(Exception): + "Exception used when a source doesn't exist" + pass + + +class Source: + '''A Source is used to get audio that is sent to the user. + + Parameters + ---------- + payload : :py:class:`~replit.types.SourceData` + The payload for the source. + loops : int + How many times the source should loop. + + ''' + __payload: SourceData + _loops: bool + _name: str + + def __init__(self, payload: SourceData, loops: bool): + self.__payload = payload + self._loops = loops + self._name = payload['Name'] + + def __get_source(self) -> SourceData or None: + source = None + with open('/tmp/audioStatus.json', 'r') as f: + data = json.loads(f.read()) + for s in data['Sources']: + if s['ID'] == self.id: + source = s + break + if source: + self.__payload = source + return source + + def __update_source(self, **changes): + s = self.__get_source() + if not s: + raise NoSuchSourceException( + f'No player with id "{id}" found! It might be done playing.') + s.update({key.title(): changes[key] for key in changes}) + with open('/tmp/audio', 'w') as f: + f.write(json.dumps(s)) + self.__get_source() + + @property + def name(self) -> str: + 'The name of the source' + return self._name + + def get_start_time(self) -> datetime: + 'When the source started plaing' + timestamp_str = self.__payload['StartTime'] + timestamp = datetime.strptime( + timestamp_str[:-4], "%Y-%m-%dT%H:%M:%S.%f") + return timestamp + + start_time: datetime = property(get_start_time) + 'Property wrapper for :py:meth:`~replit.Source.get_start_time`' + + @property + def path(self) -> str or None: + 'The path to the source, if available.' + data = self.__payload + if ReaderType(data['Type']) in file_types: + return self.__payload['Request']['Args']['Path'] + + @property + def id(self) -> int: + 'The ID of the source.' + return self.__payload['ID'] + + def get_remaining(self) -> timedelta: + "The estimated time remaining in the source's current loop." + data = self.__get_source() + if not data: + return timedelta(millaseconds=0) + + return timedelta(milliseconds=data['Remaining']) + + remaining: int = property(get_remaining) + 'Property wrapper for :py:meth:`~replit.Source.get_remaining`' + + def get_end_time(self) -> datetime or None: + '''The estimated time when the sourcce will be done playing. + Returns None if the source has finished playing. + Note: this is the estimation for the end of the current loop.''' + s = self.__get_source() + if not s: + return None + + timestamp_str = s['EndTime'] + timestamp = datetime.strptime( + timestamp_str[:-4], "%Y-%m-%dT%H:%M:%S.%f") + return timestamp + + end_time: datetime or None = property(get_end_time) + 'Property wrapper for :py:meth:`~replit.Source.get_end_time`' + + @property + def does_loop(self) -> bool: + 'Wether the source repeats itself or not.' + return self._loops + + @property + def duration(self) -> timedelta: + 'The duration of the source.' + return timedelta(millaseconds=self.__payload['Duration']) + + def get_volume(self) -> float: + 'The volume the source is set to.' + self.__get_source() + return self.__payload['Volume'] + + def set_volume(self, volume: float): + ''' + Parameters + ---------- + volume: float + The volume the source should be set to. + + Raises + ------ + NoSuchSourceException + If the source is no longer known to the audio manager. + ''' + self.__update_source(volume=volume) + + volume: float = property(get_volume, set_volume) + 'Property wrapper for :py:meth:`~replit.Source.get_volume` and :py:meth:`~replit.Source.set_volume`' + + def get_paused(self) -> bool: + 'Wether the source is paused or not.' + self.__get_source() + return self.__payload['Paused'] + + def set_paused(self, paused: bool): + ''' + Parameters + ---------- + paused: bool + Wether the source should be paused or not. + + Raises + ------ + NoSuchSourceException + If the source is no longer known to the audio manager. + ''' + self.__update_source(paused=paused) + + paused = property(get_paused, set_paused) + 'Property wrapper for :py:meth:`~replit.Source.get_paused` and :py:meth:`~replit.Source.set_paused`' + + def get_loops_remaining(self) -> int or None: + '''The remaining amount of times the file will restart. Returns none if the source is done playing. + + Returns + ------- + int + The number of loops remaining + None + The source can't be found, either because it has finished playing or an error occured. + + ''' + if not self._loops: + return 0 + + s = self.__get_source() + if not s: + return None + + if s['ID'] == self.id: + loops = s['Loop'] + + return loops + + def set_loop(self, loop_count: int) -> None: + '''Set the remaining amount of loops for the source. + Set loop_count to a negative value to repeat forever. + + Parameters + ---------- + does_loop: bool + Wether the source should be paused or not. + loop_count: int + How many times the source should repeat itself. Set to a negative value for infinite. + + Raises + ------ + NoSuchSourceException + If the source is no longer known to the audio manager. + ''' + + does_loop = loop_count != 0 + self._loops = does_loop + self.__update_source(doesLoop=does_loop, loopCount=loop_count) + + loops_remaining: int or None = property(get_loops_remaining) + 'Property wrapper for :py:meth:`~replit.Source.get_loops_remaining`' + + def toggle_playing(self) -> None: + '''Play/pause the source.''' + self.set_paused(not self.paused) + + +class Audio(): + '''The basic audio manager. + + Notes + ----- + This is not intended to be called directly, instead use :py:const:`audio`. + + Using this in addition to `audio` can cause **major** issues. + ''' + __known_ids = [] + __names_created = 0 + + def __gen_name() -> str: + return f'Source {time.time()}' + + def __get_new_source(self, name: str, does_loop: bool) -> Source: + new_source = None + timeOut = datetime.now() + timedelta(seconds=2) + + while not new_source and datetime.now() < timeOut: + try: + sources = AudioStatus(self.read_status())['Sources'] + new_source = SourceData([ + s for s in sources if s['Name'] == name + ][0]) + except IndexError: + pass + except json.JSONDecodeError: + pass + + if not new_source: + raise TimeoutError(f'Source was not created within 2 seconds.') + + return Source(new_source, does_loop) + + def play_file( + self, + file_path: str, + volume: float = 1, + does_loop: bool = False, + loop_count: int = 0, + name: str = __gen_name() + ) -> Source: + '''Sends a request to play a file, assuming the file is valid. + + Parameters + ---------- + file_path: str + The path to the file that should be played. Can be absolute or relative. + volume: float, optional + The volume the source should be played at. (1 being 100%) + does_loop: bool, optional + Wether the source should repeat itself or not. Note, if you set this you should also set loop_count. + loop_count: int, optional + How many times the source should repeat itself. Set to 0 to have the source play only once, + or set to a negative value for the source to repeat forever. + name: str, optional + The name of the file. Default value is a unique name for the source. + + Returns + ------- + Source + The source created with the provided data. + + Raises + ------ + FileNotFoundError + If the file is not found. + InvalidFileType + If the file type is not valid. + ValueError + If the type is not a valid type for a source. + ''' + if not path.exists(file_path): + raise FileNotFoundError(f'File "{file_path}" not found.') + + file_type = file_path.split('.')[-1] + + if ReaderType(file_type) not in file_types: + raise InvalidFileType(f'Type {file_type} is not supported.') + + data = RequestData( + Type=file_type, + Volume=volume, + DoesLoop=does_loop, + LoopCount=loop_count, + Name=name, + Args=RequestArgs( + Path=file_path + ) + ) + + with open('/tmp/audio', 'w') as p: + p.write(json.dumps(dict(data))) + + return self.__get_new_source(name, does_loop) + + def play_tone( + self, + duration: float, + pitch: int, + wave_type: WaveType, + does_loop: bool = False, + loop_count: int = 0, + volume: float = 1, + name: str = __gen_name(), + ) -> Source: + '''Play a tone from a frequency and wave type. + + Parameters + ---------- + duration: float + How long the tone should be played (in seconds). + pitch: int + The frequency the tone should be played at. + wave_type: WaveType + The wave shape used to generate the tone. + volume: float + The volume the tone should be played at (1 being 100%). + name: str + The name of the source. + + Returns + ------- + Source + The source for the tone. + + Raises + ------ + TimeoutError + If the source isn't found after 2 seconds. + ValueError + If the wave type isn't valid. + ''' + + # ensure the wave type is valid. This will throw an error if it isn't. + WaveType(wave_type) + + data = RequestData( + Name=name, + DoesLoop=does_loop, + LoopCount=loop_count, + Volume=volume, + Type=str(ReaderType.tone), + Args=RequestArgs( + WaveType=wave_type, + Pitch=pitch, + Seconds=duration, + ) + ) + + with open('/tmp/audio', 'w') as f: + f.write(json.dumps(data)) + + return self.__get_new_source(name, does_loop) + + def get_source(self, source_id: int) -> Source or None: + '''Get a source by it's ID + + Parameters + ---------- + source_id: int + The ID for the source that should be found. + + Returns + ------- + Source + The source with the ID provided. + + Raises + ------ + :py:exc:`~replit.NoSourceFoundException` + If the source isnt found or there isn't any sources known to the audio manager. + ''' + source = None + with open('/tmp/audioStatus.json', 'r') as f: + data = AudioStatus(json.loads(f.read())) + if not data['Sources']: + raise NoSuchSourceException('No sources exist yet.') + for s in data['Sources']: + + if s['ID'] == int(source_id): + source = s + break + if not source: + raise NoSuchSourceException( + f'Could not find source with ID "{source_id}"') + return Source(source, source['Loop']) + + def read_status(self) -> AudioStatus: + '''Get the raw data for what's playing. This is an api call, and shouldn't be needed + for general usage. + + Returns + ------- + AudioStaus + The contents of /tmp/audioStatus.json + ''' + with open('/tmp/audioStatus.json', 'r') as f: + data = AudioStatus(json.loads(f.read())) + if data['Sources'] == None: + data['Sources']: List[SourceData] = [] + return data + + def get_playing(self) -> List[Source]: + '''Get a list of playing sources. + + Returns + ------- + List[Source] + A list of sources that aren't paused. + ''' + data = self.read_status() + sources = data['Sources'] + return [Source(s, s['Loop']) for s in sources if not s['Paused']] + + def get_paused(self) -> List[Source]: + '''Get a list of paused sources. + + Returns + ------- + List[Source] + A list of sources that are paused. + + ''' + data = self.read_status() + sources = data['Sources'] + return [Source(s, s['Loop']) for s in sources if s['Paused']] + + def get_sources(self) -> List[Source]: + '''Gets all sources. + + Returns + ------- + List[Source] + Every source known to the audio manager, paused or playing. + + ''' + data = self.read_status() + sources = data['Sources'] + return [Source(s, s['Loop']) for s in sources] + + +audio = Audio() +'''The interface used for all things audio. +Can be used to play and fetch audio sources. + +''' diff --git a/src/replit/audio/replit/types.py b/src/replit/audio/replit/types.py new file mode 100644 index 0000000..aca613d --- /dev/null +++ b/src/replit/audio/replit/types.py @@ -0,0 +1,112 @@ +from typing import List +from typing_extensions import TypedDict +from enum import Enum + + +class ReaderType(Enum): + 'An Enum for the types of sources.' + + def __str__(self) -> str: + return self._value_ + + def __repr__(self) -> str: + return f'ReaderType.{self._name_}' + + wav_file = 'wav' + 'ReaderType : The type for a .wav file.' + aiff_file = 'aiff' + 'ReaderType : The type for a .aiff file.' + mp3_file = 'mp3' + 'ReaderType : The type for a .mp3 file.' + tone = 'tone' + 'ReaderType : The type for a generated tone.' + + +class WaveType(Enum): + 'The different wave shapes that can be used for tone generation.' + + def __str__(self) -> str: + return self._value_ + + WaveSine = 0 + 'WaveType : The WaveSine wave shape.' + WaveTriangle = 1 + 'WaveType : The Triangle wave shape.' + WaveSaw = 2 + 'WaveType : The Saw wave shape.' + WaveSqr = 3 + 'WaveType : The Square wave shape.' + + +file_types: List[ReaderType] = [ReaderType.aiff_file, + ReaderType.wav_file, ReaderType.mp3_file] +'The different file types for sources in a list.' + + +class RequestArgs(TypedDict, total=False): + 'The additional arguments for a request that are type-specific.' + Pitch: float + 'float : The pitch/frequency of the tone. Only used if the request type is tone.' + Seconds: float + + 'float : The duration for the tone to be played. Only used if the request type is tone.' + WaveType: WaveType or int + 'WaveType : The wave type of the tone. Only used if the request type is tone.' + Path: str + 'str : The path to the file to be read. Only used if the request is for a file type.' + + +class RequestData(TypedDict): + 'A request to pid1 for a source to be played.' + ID: int + 'int : The ID of the source. Only used for updating a pre-existing source.' + Paused: bool or None + 'bool or None : Wether the source with the provided ID should be paused or not. Can only be used when updating a source.' + Volume: float + 'float : The volume the source should be played at. (1 being 100%)' + DoesLoop: bool + 'bool : Wether the source should loop / repeat or not. Defaults to false.' + LoopCount: int + 'int : How many times the source should loop / repeat. Defaults to 0.' + Name: str + 'str : The name of the source.' + Type: ReaderType or str + 'ReaderType : The type of the source.' + Args: RequestArgs + 'RequestArgs : The additional arguments for the source.' + + +class SourceData(TypedDict): + '''A source's raw data, as a payload.''' + Name: str + 'str : The name of the source.' + Type: str + 'str : The type of the source.' + Volume: float + 'float : The volume of the source.' + Duration: int + 'int : The duration of the source in milliseconds.' + Remaining: int + 'int : How many more milliseconds the source will be playing.' + Paused: bool + 'bool : Wether the source is paused or not.' + Loop: int + 'int : How many times the source will loop. If 0, the source will not repeat itself.' + ID: int + 'int : The ID of the source.' + EndTime: str + 'str : The estimated timestamp for when the source will finish playing.' + StartTime: str + 'str : When the source started playing.' + Request: RequestData + 'RequestData : The request used to create the source.' + + +class AudioStatus(TypedDict): + 'The raw data read from /tmp/audioStatus.json.' + Sources: List[SourceData] or None + 'List[SourceData] : The sources that are know to the audio manager.' + Running: bool + 'bool : Wether the audio manager knows any sources or not.' + Disabled: bool + 'bool : Wether the audio manager is disabled or not.' diff --git a/src/replit/audio/test.py b/src/replit/audio/test.py new file mode 100644 index 0000000..5750882 --- /dev/null +++ b/src/replit/audio/test.py @@ -0,0 +1,67 @@ +import time +import unittest +import replit +from replit import audio, types +from replit.types import WaveType + +test_file = '../test.mp3' + + +class TestAudio(unittest.TestCase): + + def test_creation(self): + source = audio.play_file(test_file) + self.assertEqual(source.path, test_file) + source.paused = True + time.sleep(1) + self.assertEqual(source.paused, True, 'Pausing Source') + + def test_pause(self): + source = audio.play_file(test_file) + source.volume = 2 + time.sleep(1) + self.assertEqual(source.volume, 2, "Volume set to 2") + + source.paused = True + time.sleep(1) + self.assertEqual(source.paused, True, 'Pausing Source') + + source.volume = .2 + time.sleep(1) + self.assertEqual(source.volume, .2, 'Volume set to .2') + + source.paused = True + time.sleep(1) + self.assertEqual(source.paused, True, 'Pausing Source') + + def test_loop_setting(self): + source = audio.play_file(test_file) + + self.assertEqual(source.loops_remaining, 0, '0 loops remaining') + source.set_loop(2) + time.sleep(1) + + self.assertEqual(source.loops_remaining, 2, '2 loops remaining') + source.paused = True + time.sleep(1) + self.assertEqual(source.paused, True, 'Pausing Source') + + def test_other(self): + source = audio.play_file(test_file) + + self.assertIsNotNone(source.end_time) + self.assertIsNotNone(source.start_time) + self.assertIsNotNone(source.remaining) + source.paused = True + time.sleep(1) + self.assertEqual(source.paused, True, 'Pausing Source') + + def test_tones(self): + try: + audio.play_tone(2, 400, 2) + except TimeoutError or ValueError as e: + self.fail(e) + + +if __name__ == '__main__': + unittest.main()