Source code for acsuite

"""Frame-based cutting/trimming/splicing of audio with VapourSynth and FFmpeg."""
__all__ = ["clip_to_timecodes", "concat", "eztrim", "f2ts"]
try:
    from ._metadata import __author__, __credits__, __date__, __version__  # type: ignore
except ImportError:
    __author__ = __credits__ = __date__ = __version__ = "unknown (portable mode)"  # type: ignore

import collections
import fractions
import functools
import os
from shutil import which
from subprocess import run
from typing import cast, Deque, Dict, List, Optional, Tuple, TYPE_CHECKING, Union
from warnings import simplefilter, warn

import vapoursynth as vs

simplefilter("always")  # display warnings

Trim = Tuple[Optional[int], Optional[int]]

# fmt: off
VALID_FFMPEG_EXTENSIONS = [
    '.aac', '.m4a', '.adts',
    '.ac3',
    '.alac', '.caf',
    '.dca', '.dts',
    '.eac3',
    '.flac',
    '.gsm',
    '.mlp',
    '.mp2', '.mp3', '.mpga',
    '.opus', '.spx', '.ogg', '.oga',
    '.pcm', '.raw',
    '.sbc',
    '.thd',
    '.tta',
    '.wav', '.w64',
    '.wma',
]
# fmt: on


[docs]def eztrim( clip: vs.VideoNode, /, trims: Union[List[Trim], Trim], audio_file: str, outfile: Optional[str] = None, *, ffmpeg_path: Optional[str] = None, quiet: bool = False, timecodes_file: Optional[str] = None, debug: bool = False, ) -> Union[Dict, str]: """ Simple trimming function that follows VapourSynth/Python slicing syntax. End frame is NOT inclusive. For a 100 frame long VapourSynth clip: >>> src = core.ffms2.Source('file.mkv') >>> clip = src[3:22]+src[23:40]+src[48]+src[50:-20]+src[-10:-5]+src[97:] >>> 'These trims can be almost directly entered as:' >>> trims = [(3, 22), (23, 40), (48, 49), (50, -20), (-10, -5), (97, None)] >>> eztrim(src, trims, 'audio_file.wav') >>> src = core.ffms2.Source('file.mkv') >>> clip = src[3:-13] >>> 'A single slice can be entered as a single tuple:' >>> eztrim(src, (3, -13), 'audio_file.wav') :param clip: Input clip needed to determine framerate for audio timecodes and ``clip.num_frames`` for negative indexing. :param trims: Either a list of 2-tuples, or one tuple of 2 ints. Empty slicing must represented with a ``None``. ``src[:10]+src[-5:]`` must be entered as ``trims=[(None, 10), (-5, None)]``. Single frame slices must be represented as a normal slice. ``src[15]`` must be entered as ``trims=(15, 16)``. :param audio_file: A string to the source audio file's location (i.e. '/path/to/audio_file.ext'). If the extension is not recognized as a valid audio file extension for FFmpeg's encoders, the audio will be re-encoded to WAV losslessly. :param outfile: Either a filename 'out.ext' or a full path '/path/to/out.ext' that will be used for the trimmed audio file. The extension will be automatically inserted for you, and if it is given, it will be overwritten by the input `audio_file`'s extension. If left blank, defaults to ``audio_file_cut.ext``. :param ffmpeg_path: Set this if ``ffmpeg`` is not in your `PATH`. If ``ffmpeg`` exists in your `PATH`, it will automatically be detected and used. :param quiet: Suppresses most console output from FFmpeg. :param timecodes_file: Timecodes v2 file (generated by vspipe, ffms2, etc.) for variable-frame-rate clips. Not needed for CFR clips. :return: Returns output file name as a string for other functions. """ # --- checking for filename issues and file extension support ------------------------------------------------------ if not os.path.isfile(audio_file): raise FileNotFoundError(f"eztrim: {audio_file} not found") audio_file_name, audio_file_ext = os.path.splitext(audio_file) codec_args = [] if audio_file_ext in VALID_FFMPEG_EXTENSIONS: codec_args += ["-c:a", "copy", "-rf64", "auto"] else: warn( f"eztrim: {audio_file_ext} is not a supported extension by FFmpeg's audio encoders, re-encoding to WAV", Warning, ) audio_file_ext = ".wav" # defaults to pcm_s16le so a 24-bit input with wrong ext will be downscaled # --- re-naming outfile if not formatted correctly ----------------------------------------------------------------- if outfile is None: outfile = audio_file_name + "_cut" + audio_file_ext elif not os.path.splitext(outfile)[1]: outfile += audio_file_ext elif os.path.splitext(outfile)[1] != audio_file_ext: warn(f"eztrim: the outfile does not have the correct extension, changing to {audio_file_ext}", Warning) outfile = os.path.splitext(outfile)[0] + audio_file_ext if os.path.isfile(outfile): raise FileExistsError(f"eztrim: {outfile} already exists") # --- checking for ffmpeg ------------------------------------------------------------------------------------------ if ffmpeg_path is None: if not which("ffmpeg"): raise FileNotFoundError("eztrim: ffmpeg executable not found in PATH") else: ffmpeg_path = which("ffmpeg") else: if not os.path.isfile(ffmpeg_path): raise FileNotFoundError(f"eztrim: ffmpeg executable at {ffmpeg_path} not found") if TYPE_CHECKING: assert isinstance(ffmpeg_path, str) # --- timecodes ---------------------------------------------------------------------------------------------------- if (timecodes_file is not None) and (not os.path.isfile(timecodes_file)): raise FileNotFoundError(f"eztrim: {timecodes_file} not found") # --- trims -------------------------------------------------------------------------------------------------------- if not isinstance(trims, (list, tuple)): raise TypeError("eztrim: trims must be a list of 2-tuples (or just one 2-tuple)") if len(trims) == 1 and isinstance(trims, list): warn( "eztrim: using a list of one 2-tuple is not recommended; for a single trim," "directly use a tuple: `trims=(5,-2)` instead of `trims=[(5,-2)]`", SyntaxWarning, ) if isinstance(trims[0], tuple): trims = trims[0] # convert nested tuple in a list to just the tuple else: raise TypeError("eztrim: the inner trim must be a tuple") if isinstance(trims, tuple): if len(trims) != 2: raise ValueError("eztrim: a single tuple trim must have 2 elements") if not all(isinstance(i, (int, type(None))) for i in trims): raise TypeError("eztrim: the trim must contain only 2 ints or Nones") if trims[-1] == 0: raise ValueError("eztrim: slices cannot end with 0, if attempting to use an empty slice, use `None`") if trims == (None, None): warn("eztrim: None, None slice will cause no trimming, quitting early", Warning) if debug: return locals() else: return outfile elif isinstance(trims, list): for trim in trims: if not isinstance(trim, tuple): raise TypeError(f"eztrim: the trim {trim} is not a tuple") if len(trim) != 2: raise ValueError(f"eztrim: the trim {trim} needs 2 elements") for i in trim: if not isinstance(i, (int, type(None))): raise TypeError(f"eztrim: the trim {trim} must have 2 ints or None's") if trim[-1] == 0: raise ValueError("eztrim: slices cannot end with 0, if attempting to use an empty slice, use `None`") # ------------------------------------------------------------------------------------------------------------------ num_frames = clip.num_frames ts = functools.partial(f2ts, timecodes_file=timecodes_file, src_clip=clip) ffmpeg_silence = [ffmpeg_path, "-hide_banner", "-loglevel", "16"] if quiet else [ffmpeg_path, "-hide_banner"] # --- single trim -------------------------------------------------------------------------------------------------- if isinstance(trims, tuple): start, end = _negative_to_positive(num_frames, *trims) if TYPE_CHECKING: assert isinstance(start, int) assert isinstance(end, int) if end <= start: raise ValueError("eztrim: the trim is not logical") args = ffmpeg_silence + ["-i", audio_file, "-vn", "-ss", ts(start), "-to", ts(end)] + codec_args + [outfile] if debug: return locals() else: run(args) return outfile # --- multiple trims with concatenation ---------------------------------------------------------------------------- starts, ends = _negative_to_positive(num_frames, [s for s, e in trims], [e for s, e in trims]) if TYPE_CHECKING: assert isinstance(starts, list) assert isinstance(ends, list) if not _check_ordered(starts, ends): raise ValueError("eztrim: the trims are not logical") if os.path.isfile("_acsuite_temp_concat.txt"): raise FileExistsError("eztrim: _acsuite_temp_concat.txt already exists, quitting") else: temp_filelist = [] if not debug: concat_file = open("_acsuite_temp_concat.txt", "w") times = zip([ts(f) for f in starts], [ts(f) for f in ends]) for key, time in enumerate(times): outfile_tmp = f"_acsuite_temp_output_{key}" + os.path.splitext(outfile)[-1] if not debug: concat_file.write(f"file {outfile_tmp}\n") temp_filelist.append(outfile_tmp) args = ffmpeg_silence + ["-i", audio_file, "-vn", "-ss", time[0], "-to", time[1]] + codec_args + [outfile_tmp] if not debug: run(args) if not debug: concat_file.close() args = ffmpeg_silence + ["-f", "concat", "-i", "_acsuite_temp_concat.txt", "-c", "copy", outfile] if debug: return locals() else: run(args) os.remove("_acsuite_temp_concat.txt") for file in temp_filelist: os.remove(file) return outfile
[docs]def f2ts(f: int, /, *, precision: int = 3, timecodes_file: Optional[str] = None, src_clip: vs.VideoNode) -> str: """ Converts frame number to a timestamp based on framerate. Can handle variable-frame-rate clips as well, using similar methods to that of ``vspipe --timecodes``. For VFR clips, will use a timecodes v2 file if given, else will fallback to the slower ``src_clip.frames()`` method. Meant to be called as a ``functools.partial`` with `src_clip` specified before-hand. :param f: Frame number (indexed from ``0``). Can be negative, indexing from the last frame of the `src_clip`. :param precision: An integer in ``[0, 3, 6, 9]`` representing the precision of the timestamp (second, millisecond, microsecond, nanosecond respectively). :param timecodes_file: An optional path to a v2 timecodes plaintext file for VFR clips (not used for CFR clips). If not given, will fallback to a `much` slower method of determining each frame's timestamp. :param src_clip: A VapourSynth clip for determining the timestamp. ``src_clip.fps`` is used for CFR clips, and the frame props (``_DurationNum`` and ``_DurationDen``) are used for VFR clips if a `timecodes_file` is not given. :return: A string representing the timestamp of the requested frame number. """ if precision not in [0, 3, 6, 9]: raise ValueError(f"f2ts: the precision {precision} must be a multiple of 3 (including 0)") if f < 0: f += src_clip.num_frames if f == 0: s = 0.0 elif src_clip.fps != fractions.Fraction(0, 1): t = round(float(10 ** 9 * f * src_clip.fps ** -1)) s = t / 10 ** 9 else: if timecodes_file is not None: timecodes = [float(x) / 1000 for x in open(timecodes_file, "r").read().splitlines()[1:]] s = timecodes[f] else: s = clip_to_timecodes(src_clip)[f] m = s // 60 s %= 60 h = m // 60 m %= 60 if precision == 0: return f"{h:02.0f}:{m:02.0f}:{round(s):02}" elif precision == 3: return f"{h:02.0f}:{m:02.0f}:{s:06.3f}" elif precision == 6: return f"{h:02.0f}:{m:02.0f}:{s:09.6f}" elif precision == 9: return f"{h:02.0f}:{m:02.0f}:{s:012.9f}"
[docs]@functools.lru_cache def clip_to_timecodes(src_clip: vs.VideoNode) -> Deque[float]: """ Cached function to return a list of timecodes for vfr clips. The first call to this function can be `very` expensive depending on the `src_clip` length and the source filter used. Subsequent calls on the same clip will return the previously generated list of timecodes. The timecodes are `floats` representing seconds from the start of the `src_clip`. If you have ``rich`` installed, will output a pretty progress bar as this process can take a long time. """ # fmt: off try: from rich.progress import track rich = True except ImportError: track = lambda x, description, total: x # type: ignore rich = False # fmt: on timecodes = collections.deque([0.0], maxlen=src_clip.num_frames + 1) curr_time = fractions.Fraction() init_percentage = 0 for frame in track(src_clip.frames(), description="Finding timestamps...", total=src_clip.num_frames): num = cast(int, frame.props["_DurationNum"]) den = cast(int, frame.props["_DurationDen"]) curr_time += fractions.Fraction(num, den) timecodes.append(float(curr_time)) if rich: pass # if ran in a normal console/terminal, should render a pretty progress bar else: percentage_done = round(100 * len(timecodes) / src_clip.num_frames) if percentage_done % 10 == 0 and percentage_done != init_percentage: print(rf"Finding timecodes for variable-framerate clip: {percentage_done}% done") init_percentage = percentage_done return timecodes
_Neg2pos_in = Union[List[Optional[int]], Optional[int]] _Neg2pos_out = Union[Tuple[List[int], List[int]], Tuple[int, int]] def _negative_to_positive(num_frames: int, a: _Neg2pos_in, b: _Neg2pos_in) -> _Neg2pos_out: """Changes negative/zero index to positive based on num_frames.""" single_trim = isinstance(a, (int, type(None))) and isinstance(b, (int, type(None))) # --- single trim -------------------------------------------------------------------------------------------------- if single_trim: a, b = (a or 0), (b or 0) if TYPE_CHECKING: assert isinstance(a, int) assert isinstance(b, int) if abs(a) > num_frames or abs(b) > num_frames: raise ValueError(f"_negative_to_positive: {max(abs(a), abs(b))} is out of bounds") return a if a >= 0 else num_frames + a, b if b > 0 else num_frames + b # --- multiple trims ----------------------------------------------------------------------------------------------- if TYPE_CHECKING: assert isinstance(a, list) assert isinstance(b, list) if len(a) != len(b): raise ValueError("_negative_to_positive: lists must be same length") real_a, real_b = [(i or 0) for i in a], [(i or 0) for i in b] # convert None to 0 if not (all(abs(i) <= num_frames for i in real_a) and all(abs(i) <= num_frames for i in real_b)): raise ValueError("_negative_to_positive: one or more trims are out of bounds") if all(i >= 0 for i in real_a) and all(i > 0 for i in real_b): return real_a, real_b positive_a = [x if x >= 0 else num_frames + x for x in real_a] positive_b = [y if y > 0 else num_frames + y for y in real_b] return positive_a, positive_b def _check_ordered(starts: List[int], ends: List[int]) -> bool: """Checks if lists follow logical Python slicing.""" if not all(starts[i] < ends[i] for i in range(len(starts))): return False if not all(ends[i] < starts[i + 1] for i in range(len(starts) - 1)): warn("_check_ordered: one or more trims will cause overlapping", Warning) return True
[docs]def concat( audio_files: List[str], outfile: str, *, ffmpeg_path: Optional[str] = None, quiet: bool = False, debug: bool = False ) -> Optional[Dict]: """Function to concatenate mutliple audio files. All audio files must have the same extension, and the outfile must have the same extension as the audio files. :param audio_files: List of strings representing audio file paths (i.e. ``['file1.wav', 'file2.wav']``). :param outfile: String representing desired filename for the concatenated audio. :param ffmpeg_path: Set this if ``ffmpeg`` is not in your `PATH`. If ``ffmpeg`` exists in your `PATH`, it will automatically be detected and used. :param quiet: Suppresses most console output from FFmpeg. """ # --- checking for ffmpeg ------------------------------------------------------------------------------------------ if ffmpeg_path is None: if not which("ffmpeg"): raise FileNotFoundError("concat: ffmpeg executable not found in PATH") else: ffmpeg_path = which("ffmpeg") else: if not os.path.isfile(ffmpeg_path): raise FileNotFoundError(f"concat: ffmpeg executable at {ffmpeg_path} not found") if TYPE_CHECKING: assert isinstance(ffmpeg_path, str) # --- checking for filename issues and file extension support ------------------------------------------------------ if len(audio_files) < 2: raise ValueError("concat: requires 2 or more audio files to concatenate") audio_file_extensions = set([os.path.splitext(af)[1] for af in audio_files] + [os.path.splitext(outfile)[1]]) if len(audio_file_extensions) > 1: raise ValueError("concat: all files must have the same extension") if (ext := audio_file_extensions.pop()) not in VALID_FFMPEG_EXTENSIONS: raise ValueError(f"concat: '{ext}' is not a valid extension recognized by any known FFmpeg encoders") for af in audio_files: if not os.path.isfile(af): raise FileNotFoundError(f"concat: {af} not found") if os.path.isfile(outfile): raise FileExistsError(f"concat: {outfile} already exists") # ------------------------------------------------------------------------------------------------------------------ ffmpeg_silence = [ffmpeg_path, "-hide_banner", "-loglevel", "16"] if quiet else [ffmpeg_path, "-hide_banner"] if os.path.isfile("_acsuite_temp_concat.txt"): raise FileExistsError("concat: _acsuite_temp_concat.txt already exists, quitting") if not debug: concat_file = open("_acsuite_temp_concat.txt", "w") for af in audio_files: concat_file.write(f"file {af}\n") concat_file.close() args = ffmpeg_silence + ["-f", "concat", "-i", "_acsuite_temp_concat.txt", "-c", "copy", outfile] if debug: return locals() run(args) os.remove("_acsuite_temp_concat.txt")