From 77bd0489d5923b3f52eef945d4119b1ee2111edd Mon Sep 17 00:00:00 2001 From: Yotam Sinvani Date: Sat, 20 Aug 2022 19:25:06 +0300 Subject: [PATCH] integrate midi stuff --- basic_pitch/inference.py | 264 +++++++++++++++++++++++++++++++++------ setup.cfg | 2 +- tests/test_inference.py | 5 + 3 files changed, 232 insertions(+), 39 deletions(-) diff --git a/basic_pitch/inference.py b/basic_pitch/inference.py index 1393ff37..b3191bfc 100644 --- a/basic_pitch/inference.py +++ b/basic_pitch/inference.py @@ -20,19 +20,16 @@ import json import os import pathlib +import wave from typing import Dict, List, Optional, Sequence, Tuple, Union -from tensorflow import Tensor, signal, keras, saved_model -import numpy as np import librosa +import numpy as np import pretty_midi +import pyaudio +from numpy import ndarray +from tensorflow import Tensor, signal, keras, saved_model -from basic_pitch.constants import ( - AUDIO_SAMPLE_RATE, - AUDIO_N_SAMPLES, - ANNOTATIONS_FPS, - FFT_HOP, -) from basic_pitch import ICASSP_2022_MODEL_PATH, note_creation as infer from basic_pitch.commandline_printing import ( entertaining_waiting, @@ -41,9 +38,15 @@ file_saved_confirmation, failed_to_save, ) +from basic_pitch.constants import ( + AUDIO_SAMPLE_RATE, + AUDIO_N_SAMPLES, + ANNOTATIONS_FPS, + FFT_HOP, +) -def window_audio_file(audio_original: Tensor, hop_size: int) -> Tuple[Tensor, List[Dict[str, int]]]: +def window_audio_file(audio_original: ndarray, hop_size: int) -> Tuple[Tensor, List[Dict[str, int]]]: """ Pad appropriately an audio file, and return as windowed signal, with window length = AUDIO_N_SAMPLES @@ -71,7 +74,7 @@ def window_audio_file(audio_original: Tensor, hop_size: int) -> Tuple[Tensor, Li def get_audio_input( - audio_path: Union[pathlib.Path, str], overlap_len: int, hop_size: int + audio_path: Union[pathlib.Path, str], overlap_len: int, hop_size: int ) -> Tuple[Tensor, List[Dict[str, int]], int]: """ Read wave file (as mono), pad appropriately, and return as @@ -122,7 +125,7 @@ def unwrap_output(output: Tensor, audio_original_length: int, n_overlapping_fram def run_inference( - audio_path: Union[pathlib.Path, str], model: keras.Model, debug_file: Optional[pathlib.Path] = None + audio_path: Union[pathlib.Path, str], model: keras.Model, debug_file: Optional[pathlib.Path] = None ) -> Dict[str, np.array]: """Run the model on the input audio path. @@ -160,6 +163,52 @@ def run_inference( return unwrapped_output +def run_inference_rt( + audio_data: ndarray, + model: keras.Model, + debug_file: Optional[pathlib.Path] = None +) -> Dict[str, np.array]: + """Run the model on the input audio path. + + Args: + audio_path: The audio to run inference on. + model: A loaded keras model to run inference with. + debug_file: An optional path to output debug data to. Useful for testing/verification. + + Returns: + A dictionary with the notes, onsets and contours from model inference. + :param audio_data: + :param debug_file: + :param model: + :param get_input: + """ + # overlap 30 frames + n_overlapping_frames = 5 + overlap_len = n_overlapping_frames * FFT_HOP + hop_size = AUDIO_N_SAMPLES - overlap_len + + audio_length = audio_data.shape[0] + audio_windowed, window_times = window_audio_file(audio_data, audio_length) + + output = model(audio_windowed) + unwrapped_output = {k: unwrap_output(output[k], audio_length, n_overlapping_frames) for k in output} + + if debug_file: + with open(debug_file, "a") as f: + json.dump( + { + "audio_windowed": audio_windowed.numpy().tolist(), + "audio_original_length": audio_length, + "hop_size_samples": hop_size, + "overlap_length_samples": overlap_len, + "unwrapped_output": {k: v.tolist() for k, v in unwrapped_output.items()}, + }, + f, + ) + + return unwrapped_output + + class OutputExtensions(enum.Enum): MIDI = "mid" MODEL_OUTPUT_NPZ = "npz" @@ -200,7 +249,7 @@ def verify_output_dir(output_dir: Union[pathlib.Path, str]) -> None: def build_output_path( - audio_path: Union[pathlib.Path, str], output_directory: Union[pathlib.Path, str], output_type: OutputExtensions + audio_path: Union[pathlib.Path, str], output_directory: Union[pathlib.Path, str], output_type: OutputExtensions ) -> pathlib.Path: """Create an output path and make sure it doesn't already exist. @@ -235,7 +284,7 @@ def build_output_path( def save_note_events( - note_events: List[Tuple[float, float, int, float, Optional[List[int]]]], save_path: Union[pathlib.Path, str] + note_events: List[Tuple[float, float, int, float, Optional[List[int]]]], save_path: Union[pathlib.Path, str] ) -> None: """Save note events to file @@ -256,16 +305,16 @@ def save_note_events( def predict( - audio_path: Union[pathlib.Path, str], - model_or_model_path: Union[keras.Model, pathlib.Path, str] = ICASSP_2022_MODEL_PATH, - onset_threshold: float = 0.5, - frame_threshold: float = 0.3, - minimum_note_length: float = 58, - minimum_frequency: Optional[float] = None, - maximum_frequency: Optional[float] = None, - multiple_pitch_bends: bool = False, - melodia_trick: bool = True, - debug_file: Optional[pathlib.Path] = None, + audio_path: Union[pathlib.Path, str], + model_or_model_path: Union[keras.Model, pathlib.Path, str] = ICASSP_2022_MODEL_PATH, + onset_threshold: float = 0.5, + frame_threshold: float = 0.3, + minimum_note_length: float = 58, + minimum_frequency: Optional[float] = None, + maximum_frequency: Optional[float] = None, + multiple_pitch_bends: bool = False, + melodia_trick: bool = True, + debug_file: Optional[pathlib.Path] = None, ) -> Tuple[Dict[str, np.array], pretty_midi.PrettyMIDI, List[Tuple[float, float, int, float, Optional[List[int]]]]]: """Run a single prediction. @@ -334,22 +383,161 @@ def predict( return model_output, midi_data, note_events +def predict_rt( + model_or_model_path: Union[keras.Model, pathlib.Path, str] = ICASSP_2022_MODEL_PATH, + onset_threshold: float = 0.5, + frame_threshold: float = 0.3, + minimum_note_length: float = 58, + minimum_frequency: Optional[float] = None, + maximum_frequency: Optional[float] = None, + multiple_pitch_bends: bool = False, + melodia_trick: bool = True, +) -> Tuple[Dict[str, np.array], pretty_midi.PrettyMIDI, List[Tuple[float, float, int, float, Optional[List[int]]]]]: + """Run a single prediction. + + Args: + model_or_model_path: Path to load the Keras saved model from. Can be local or on GCS. + onset_threshold: Minimum energy required for an onset to be considered present. + frame_threshold: Minimum energy requirement for a frame to be considered present. + minimum_note_length: The minimum allowed note length in frames. + minimum_freq: Minimum allowed output frequency, in Hz. If None, all frequencies are used. + maximum_freq: Maximum allowed output frequency, in Hz. If None, all frequencies are used. + multiple_pitch_bends: If True, allow overlapping notes in midi file to have pitch bends. + melodia_trick: Use the melodia post-processing step. + debug_file: An optional path to output debug data to. Useful for testing/verification. + Returns: + The model output, midi data and note events from a single prediction + """ + + # It's convenient to be able to pass in a keras saved model so if + # someone wants to place this function in a loop, + # the model doesn't have to be reloaded every function call + if isinstance(model_or_model_path, (pathlib.Path, str)): + model = saved_model.load(str(model_or_model_path)) + else: + model = model_or_model_path + + sample_format = pyaudio.paInt16 # 16 bits per sample + channels = 1 + fs = AUDIO_SAMPLE_RATE # Record at 44100 samples per second + seconds = 600 + sound_filename = pathlib.Path('inference.wav') + debug_file = pathlib.Path('inference.json') + + p = pyaudio.PyAudio() # Create an interface to PortAudio + + print('Recording') + + # overlap 30 frames + n_overlapping_frames = 5 + overlap_len = n_overlapping_frames * FFT_HOP + hop_size = AUDIO_N_SAMPLES - overlap_len + + stream = p.open(format=sample_format, + channels=channels, + rate=fs, + frames_per_buffer=hop_size, + input=True) + + prev_data = np.zeros((int(overlap_len),), dtype=np.float32) + + # Initialize array to store frames + frames = [] + + # Read X seconds + for i in range(0, int(fs / hop_size * seconds)): + data = stream.read(hop_size) + frames.append(data) + + data = librosa.util.buf_to_float(data) + flatness = np.mean(librosa.feature.spectral_flatness(y=data)) + + data_nz = np.count_nonzero(data) + data_e5 = (np.abs(data) < 1e-05).sum() + if flatness > 0.001: + continue + + data = np.concatenate([prev_data, data]) + prev_data = data[-overlap_len - 1:-1] + + model_output = run_inference_rt(audio_data=data, model=model, debug_file=debug_file) + + min_note_len = int(np.round(minimum_note_length / 1000 * (AUDIO_SAMPLE_RATE / FFT_HOP))) + midi_data, note_events = infer.model_output_to_notes( + model_output, + onset_thresh=onset_threshold, + frame_thresh=frame_threshold, + min_note_len=min_note_len, # convert to frames + min_freq=minimum_frequency, + max_freq=maximum_frequency, + multiple_pitch_bends=multiple_pitch_bends, + melodia_trick=melodia_trick, + ) + + for start_time, end_time, pitch, amplitude, pitch_bends in note_events: + note_data = ( + int(pitch), + float(amplitude), + float(start_time), + float(end_time), + data_nz, + data_e5, flatness + ) + print(f'{note_data}') + + with open(debug_file, "a") as f: + json.dump( + { + "min_note_length": min_note_len, + "onset_thresh": onset_threshold, + "frame_thresh": frame_threshold, + "estimated_notes": [ + ( + float(start_time), + float(end_time), + int(pitch), + float(amplitude), + [int(b) for b in pitch_bends] if pitch_bends else None, + ) + for start_time, end_time, pitch, amplitude, pitch_bends in note_events + ], + }, + f, + ) + + # Stop and close the stream + stream.stop_stream() + stream.close() + # Terminate the PortAudio interface + p.terminate() + + print('Finished recording') + + # Save the recorded data as a WAV file + wf = wave.open(str(sound_filename), 'wb') + wf.setnchannels(channels) + wf.setsampwidth(p.get_sample_size(sample_format)) + wf.setframerate(fs) + wf.writeframes(b''.join(frames)) + wf.close() + + def predict_and_save( - audio_path_list: Sequence[Union[pathlib.Path, str]], - output_directory: Union[pathlib.Path, str], - save_midi: bool, - sonify_midi: bool, - save_model_outputs: bool, - save_notes: bool, - model_path: Union[pathlib.Path, str] = ICASSP_2022_MODEL_PATH, - onset_threshold: float = 0.5, - frame_threshold: float = 0.3, - minimum_note_length: float = 58, - minimum_frequency: Optional[float] = None, - maximum_frequency: Optional[float] = None, - multiple_pitch_bends: bool = False, - melodia_trick: bool = True, - debug_file: Optional[pathlib.Path] = None, + audio_path_list: Sequence[Union[pathlib.Path, str]], + output_directory: Union[pathlib.Path, str], + save_midi: bool, + sonify_midi: bool, + save_model_outputs: bool, + save_notes: bool, + model_path: Union[pathlib.Path, str] = ICASSP_2022_MODEL_PATH, + onset_threshold: float = 0.5, + frame_threshold: float = 0.3, + minimum_note_length: float = 58, + minimum_frequency: Optional[float] = None, + maximum_frequency: Optional[float] = None, + multiple_pitch_bends: bool = False, + melodia_trick: bool = True, + debug_file: Optional[pathlib.Path] = None, ) -> None: """Make a prediction and save the results to file. diff --git a/setup.cfg b/setup.cfg index 6cfd0937..d2ecc4e9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -40,7 +40,7 @@ install_requires = resampy>=0.2.2 scipy>=1.4.1 tensorflow>=2.4.1,<2.7.0 - typing_extensions + typing_extensions>=3.7,<3.11 [options.entry_points] console_scripts = diff --git a/tests/test_inference.py b/tests/test_inference.py index 23a477e5..29c39c45 100644 --- a/tests/test_inference.py +++ b/tests/test_inference.py @@ -47,6 +47,11 @@ def test_predict(self) -> None: assert all(note_pitch_max) assert isinstance(note_events, list) + def test_predict_rt(self) -> None: + inference.predict_rt( + ICASSP_2022_MODEL_PATH, + ) + def test_predict_with_saves(self) -> None: test_audio_path = RESOURCES_PATH / "vocadito_10.wav" with tempfile.TemporaryDirectory() as tmpdir: