Add audio code

This commit is contained in:
Scoder12
2020-07-27 14:03:09 -07:00
parent 5190b8a9a6
commit 736683f28e
8 changed files with 820 additions and 0 deletions
+18
View File
@@ -0,0 +1,18 @@
.PHONY: test, docs
test:
@python3 test.py
docs:
@cd ./docs && make
docs-%:
@echo $(shell echo $@ | cut -c6-)
@cd ./docs && make $(shell echo $@ | cut -c6-)
+20
View File
@@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile clean
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile clean
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
+59
View File
@@ -0,0 +1,59 @@
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
import os
import sys
sys.path.insert(0, os.path.abspath('../'))
sys.path.append(os.path.abspath('../'))
# -- Project information -----------------------------------------------------
project = 'replit'
copyright = '2020, repl.it'
author = 'repl.it'
# The full version, including alpha/beta/rc tags
release = '1.2.0'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.napoleon',
'sphinx_autodoc_typehints'
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'conf.py']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'groundwork'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
+36
View File
@@ -0,0 +1,36 @@
.. replit documentation master file, created by
sphinx-quickstart on Mon Jun 22 18:35:18 2020.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
(python) replit api reference.
==============================
.. toctree::
:maxdepth: 2
:caption: Contents:
Module contents
---------------
.. automodule:: replit
:members:
:undoc-members:
:show-inheritance:
replit.types module
-------------------
.. automodule:: replit.types
:members:
:undoc-members:
:show-inheritance:
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
+35
View File
@@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
if "%1" == "" goto help
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.http://sphinx-doc.org/
exit /b 1
)
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd
+473
View File
@@ -0,0 +1,473 @@
import json
import time
from replit.types import ReaderType, RequestArgs, RequestData, SourceData, AudioStatus, WaveType, file_types
from typing import List
from datetime import datetime, timedelta
from os import path
def clear():
'Clear is used to clear the terminal.'
print('\033[H\033[2J', end='', flush=True)
class InvalidFileType(Exception):
"Exception for when a requested file's type isnt valid"
pass
class NoSuchSourceException(Exception):
"Exception used when a source doesn't exist"
pass
class Source:
'''A Source is used to get audio that is sent to the user.
Parameters
----------
payload : :py:class:`~replit.types.SourceData`
The payload for the source.
loops : int
How many times the source should loop.
'''
__payload: SourceData
_loops: bool
_name: str
def __init__(self, payload: SourceData, loops: bool):
self.__payload = payload
self._loops = loops
self._name = payload['Name']
def __get_source(self) -> SourceData or None:
source = None
with open('/tmp/audioStatus.json', 'r') as f:
data = json.loads(f.read())
for s in data['Sources']:
if s['ID'] == self.id:
source = s
break
if source:
self.__payload = source
return source
def __update_source(self, **changes):
s = self.__get_source()
if not s:
raise NoSuchSourceException(
f'No player with id "{id}" found! It might be done playing.')
s.update({key.title(): changes[key] for key in changes})
with open('/tmp/audio', 'w') as f:
f.write(json.dumps(s))
self.__get_source()
@property
def name(self) -> str:
'The name of the source'
return self._name
def get_start_time(self) -> datetime:
'When the source started plaing'
timestamp_str = self.__payload['StartTime']
timestamp = datetime.strptime(
timestamp_str[:-4], "%Y-%m-%dT%H:%M:%S.%f")
return timestamp
start_time: datetime = property(get_start_time)
'Property wrapper for :py:meth:`~replit.Source.get_start_time`'
@property
def path(self) -> str or None:
'The path to the source, if available.'
data = self.__payload
if ReaderType(data['Type']) in file_types:
return self.__payload['Request']['Args']['Path']
@property
def id(self) -> int:
'The ID of the source.'
return self.__payload['ID']
def get_remaining(self) -> timedelta:
"The estimated time remaining in the source's current loop."
data = self.__get_source()
if not data:
return timedelta(millaseconds=0)
return timedelta(milliseconds=data['Remaining'])
remaining: int = property(get_remaining)
'Property wrapper for :py:meth:`~replit.Source.get_remaining`'
def get_end_time(self) -> datetime or None:
'''The estimated time when the sourcce will be done playing.
Returns None if the source has finished playing.
Note: this is the estimation for the end of the current loop.'''
s = self.__get_source()
if not s:
return None
timestamp_str = s['EndTime']
timestamp = datetime.strptime(
timestamp_str[:-4], "%Y-%m-%dT%H:%M:%S.%f")
return timestamp
end_time: datetime or None = property(get_end_time)
'Property wrapper for :py:meth:`~replit.Source.get_end_time`'
@property
def does_loop(self) -> bool:
'Wether the source repeats itself or not.'
return self._loops
@property
def duration(self) -> timedelta:
'The duration of the source.'
return timedelta(millaseconds=self.__payload['Duration'])
def get_volume(self) -> float:
'The volume the source is set to.'
self.__get_source()
return self.__payload['Volume']
def set_volume(self, volume: float):
'''
Parameters
----------
volume: float
The volume the source should be set to.
Raises
------
NoSuchSourceException
If the source is no longer known to the audio manager.
'''
self.__update_source(volume=volume)
volume: float = property(get_volume, set_volume)
'Property wrapper for :py:meth:`~replit.Source.get_volume` and :py:meth:`~replit.Source.set_volume`'
def get_paused(self) -> bool:
'Wether the source is paused or not.'
self.__get_source()
return self.__payload['Paused']
def set_paused(self, paused: bool):
'''
Parameters
----------
paused: bool
Wether the source should be paused or not.
Raises
------
NoSuchSourceException
If the source is no longer known to the audio manager.
'''
self.__update_source(paused=paused)
paused = property(get_paused, set_paused)
'Property wrapper for :py:meth:`~replit.Source.get_paused` and :py:meth:`~replit.Source.set_paused`'
def get_loops_remaining(self) -> int or None:
'''The remaining amount of times the file will restart. Returns none if the source is done playing.
Returns
-------
int
The number of loops remaining
None
The source can't be found, either because it has finished playing or an error occured.
'''
if not self._loops:
return 0
s = self.__get_source()
if not s:
return None
if s['ID'] == self.id:
loops = s['Loop']
return loops
def set_loop(self, loop_count: int) -> None:
'''Set the remaining amount of loops for the source.
Set loop_count to a negative value to repeat forever.
Parameters
----------
does_loop: bool
Wether the source should be paused or not.
loop_count: int
How many times the source should repeat itself. Set to a negative value for infinite.
Raises
------
NoSuchSourceException
If the source is no longer known to the audio manager.
'''
does_loop = loop_count != 0
self._loops = does_loop
self.__update_source(doesLoop=does_loop, loopCount=loop_count)
loops_remaining: int or None = property(get_loops_remaining)
'Property wrapper for :py:meth:`~replit.Source.get_loops_remaining`'
def toggle_playing(self) -> None:
'''Play/pause the source.'''
self.set_paused(not self.paused)
class Audio():
'''The basic audio manager.
Notes
-----
This is not intended to be called directly, instead use :py:const:`audio`.
Using this in addition to `audio` can cause **major** issues.
'''
__known_ids = []
__names_created = 0
def __gen_name() -> str:
return f'Source {time.time()}'
def __get_new_source(self, name: str, does_loop: bool) -> Source:
new_source = None
timeOut = datetime.now() + timedelta(seconds=2)
while not new_source and datetime.now() < timeOut:
try:
sources = AudioStatus(self.read_status())['Sources']
new_source = SourceData([
s for s in sources if s['Name'] == name
][0])
except IndexError:
pass
except json.JSONDecodeError:
pass
if not new_source:
raise TimeoutError(f'Source was not created within 2 seconds.')
return Source(new_source, does_loop)
def play_file(
self,
file_path: str,
volume: float = 1,
does_loop: bool = False,
loop_count: int = 0,
name: str = __gen_name()
) -> Source:
'''Sends a request to play a file, assuming the file is valid.
Parameters
----------
file_path: str
The path to the file that should be played. Can be absolute or relative.
volume: float, optional
The volume the source should be played at. (1 being 100%)
does_loop: bool, optional
Wether the source should repeat itself or not. Note, if you set this you should also set loop_count.
loop_count: int, optional
How many times the source should repeat itself. Set to 0 to have the source play only once,
or set to a negative value for the source to repeat forever.
name: str, optional
The name of the file. Default value is a unique name for the source.
Returns
-------
Source
The source created with the provided data.
Raises
------
FileNotFoundError
If the file is not found.
InvalidFileType
If the file type is not valid.
ValueError
If the type is not a valid type for a source.
'''
if not path.exists(file_path):
raise FileNotFoundError(f'File "{file_path}" not found.')
file_type = file_path.split('.')[-1]
if ReaderType(file_type) not in file_types:
raise InvalidFileType(f'Type {file_type} is not supported.')
data = RequestData(
Type=file_type,
Volume=volume,
DoesLoop=does_loop,
LoopCount=loop_count,
Name=name,
Args=RequestArgs(
Path=file_path
)
)
with open('/tmp/audio', 'w') as p:
p.write(json.dumps(dict(data)))
return self.__get_new_source(name, does_loop)
def play_tone(
self,
duration: float,
pitch: int,
wave_type: WaveType,
does_loop: bool = False,
loop_count: int = 0,
volume: float = 1,
name: str = __gen_name(),
) -> Source:
'''Play a tone from a frequency and wave type.
Parameters
----------
duration: float
How long the tone should be played (in seconds).
pitch: int
The frequency the tone should be played at.
wave_type: WaveType
The wave shape used to generate the tone.
volume: float
The volume the tone should be played at (1 being 100%).
name: str
The name of the source.
Returns
-------
Source
The source for the tone.
Raises
------
TimeoutError
If the source isn't found after 2 seconds.
ValueError
If the wave type isn't valid.
'''
# ensure the wave type is valid. This will throw an error if it isn't.
WaveType(wave_type)
data = RequestData(
Name=name,
DoesLoop=does_loop,
LoopCount=loop_count,
Volume=volume,
Type=str(ReaderType.tone),
Args=RequestArgs(
WaveType=wave_type,
Pitch=pitch,
Seconds=duration,
)
)
with open('/tmp/audio', 'w') as f:
f.write(json.dumps(data))
return self.__get_new_source(name, does_loop)
def get_source(self, source_id: int) -> Source or None:
'''Get a source by it's ID
Parameters
----------
source_id: int
The ID for the source that should be found.
Returns
-------
Source
The source with the ID provided.
Raises
------
:py:exc:`~replit.NoSourceFoundException`
If the source isnt found or there isn't any sources known to the audio manager.
'''
source = None
with open('/tmp/audioStatus.json', 'r') as f:
data = AudioStatus(json.loads(f.read()))
if not data['Sources']:
raise NoSuchSourceException('No sources exist yet.')
for s in data['Sources']:
if s['ID'] == int(source_id):
source = s
break
if not source:
raise NoSuchSourceException(
f'Could not find source with ID "{source_id}"')
return Source(source, source['Loop'])
def read_status(self) -> AudioStatus:
'''Get the raw data for what's playing. This is an api call, and shouldn't be needed
for general usage.
Returns
-------
AudioStaus
The contents of /tmp/audioStatus.json
'''
with open('/tmp/audioStatus.json', 'r') as f:
data = AudioStatus(json.loads(f.read()))
if data['Sources'] == None:
data['Sources']: List[SourceData] = []
return data
def get_playing(self) -> List[Source]:
'''Get a list of playing sources.
Returns
-------
List[Source]
A list of sources that aren't paused.
'''
data = self.read_status()
sources = data['Sources']
return [Source(s, s['Loop']) for s in sources if not s['Paused']]
def get_paused(self) -> List[Source]:
'''Get a list of paused sources.
Returns
-------
List[Source]
A list of sources that are paused.
'''
data = self.read_status()
sources = data['Sources']
return [Source(s, s['Loop']) for s in sources if s['Paused']]
def get_sources(self) -> List[Source]:
'''Gets all sources.
Returns
-------
List[Source]
Every source known to the audio manager, paused or playing.
'''
data = self.read_status()
sources = data['Sources']
return [Source(s, s['Loop']) for s in sources]
audio = Audio()
'''The interface used for all things audio.
Can be used to play and fetch audio sources.
'''
+112
View File
@@ -0,0 +1,112 @@
from typing import List
from typing_extensions import TypedDict
from enum import Enum
class ReaderType(Enum):
'An Enum for the types of sources.'
def __str__(self) -> str:
return self._value_
def __repr__(self) -> str:
return f'ReaderType.{self._name_}'
wav_file = 'wav'
'ReaderType : The type for a .wav file.'
aiff_file = 'aiff'
'ReaderType : The type for a .aiff file.'
mp3_file = 'mp3'
'ReaderType : The type for a .mp3 file.'
tone = 'tone'
'ReaderType : The type for a generated tone.'
class WaveType(Enum):
'The different wave shapes that can be used for tone generation.'
def __str__(self) -> str:
return self._value_
WaveSine = 0
'WaveType : The WaveSine wave shape.'
WaveTriangle = 1
'WaveType : The Triangle wave shape.'
WaveSaw = 2
'WaveType : The Saw wave shape.'
WaveSqr = 3
'WaveType : The Square wave shape.'
file_types: List[ReaderType] = [ReaderType.aiff_file,
ReaderType.wav_file, ReaderType.mp3_file]
'The different file types for sources in a list.'
class RequestArgs(TypedDict, total=False):
'The additional arguments for a request that are type-specific.'
Pitch: float
'float : The pitch/frequency of the tone. Only used if the request type is tone.'
Seconds: float
'float : The duration for the tone to be played. Only used if the request type is tone.'
WaveType: WaveType or int
'WaveType : The wave type of the tone. Only used if the request type is tone.'
Path: str
'str : The path to the file to be read. Only used if the request is for a file type.'
class RequestData(TypedDict):
'A request to pid1 for a source to be played.'
ID: int
'int : The ID of the source. Only used for updating a pre-existing source.'
Paused: bool or None
'bool or None : Wether the source with the provided ID should be paused or not. Can only be used when updating a source.'
Volume: float
'float : The volume the source should be played at. (1 being 100%)'
DoesLoop: bool
'bool : Wether the source should loop / repeat or not. Defaults to false.'
LoopCount: int
'int : How many times the source should loop / repeat. Defaults to 0.'
Name: str
'str : The name of the source.'
Type: ReaderType or str
'ReaderType : The type of the source.'
Args: RequestArgs
'RequestArgs : The additional arguments for the source.'
class SourceData(TypedDict):
'''A source's raw data, as a payload.'''
Name: str
'str : The name of the source.'
Type: str
'str : The type of the source.'
Volume: float
'float : The volume of the source.'
Duration: int
'int : The duration of the source in milliseconds.'
Remaining: int
'int : How many more milliseconds the source will be playing.'
Paused: bool
'bool : Wether the source is paused or not.'
Loop: int
'int : How many times the source will loop. If 0, the source will not repeat itself.'
ID: int
'int : The ID of the source.'
EndTime: str
'str : The estimated timestamp for when the source will finish playing.'
StartTime: str
'str : When the source started playing.'
Request: RequestData
'RequestData : The request used to create the source.'
class AudioStatus(TypedDict):
'The raw data read from /tmp/audioStatus.json.'
Sources: List[SourceData] or None
'List[SourceData] : The sources that are know to the audio manager.'
Running: bool
'bool : Wether the audio manager knows any sources or not.'
Disabled: bool
'bool : Wether the audio manager is disabled or not.'
+67
View File
@@ -0,0 +1,67 @@
import time
import unittest
import replit
from replit import audio, types
from replit.types import WaveType
test_file = '../test.mp3'
class TestAudio(unittest.TestCase):
def test_creation(self):
source = audio.play_file(test_file)
self.assertEqual(source.path, test_file)
source.paused = True
time.sleep(1)
self.assertEqual(source.paused, True, 'Pausing Source')
def test_pause(self):
source = audio.play_file(test_file)
source.volume = 2
time.sleep(1)
self.assertEqual(source.volume, 2, "Volume set to 2")
source.paused = True
time.sleep(1)
self.assertEqual(source.paused, True, 'Pausing Source')
source.volume = .2
time.sleep(1)
self.assertEqual(source.volume, .2, 'Volume set to .2')
source.paused = True
time.sleep(1)
self.assertEqual(source.paused, True, 'Pausing Source')
def test_loop_setting(self):
source = audio.play_file(test_file)
self.assertEqual(source.loops_remaining, 0, '0 loops remaining')
source.set_loop(2)
time.sleep(1)
self.assertEqual(source.loops_remaining, 2, '2 loops remaining')
source.paused = True
time.sleep(1)
self.assertEqual(source.paused, True, 'Pausing Source')
def test_other(self):
source = audio.play_file(test_file)
self.assertIsNotNone(source.end_time)
self.assertIsNotNone(source.start_time)
self.assertIsNotNone(source.remaining)
source.paused = True
time.sleep(1)
self.assertEqual(source.paused, True, 'Pausing Source')
def test_tones(self):
try:
audio.play_tone(2, 400, 2)
except TimeoutError or ValueError as e:
self.fail(e)
if __name__ == '__main__':
unittest.main()