Source code for manim.scene.scene_file_writer

"""The interface between scenes and ffmpeg."""

from __future__ import annotations

__all__ = ["SceneFileWriter"]

import json
import os
import shutil
import subprocess
from pathlib import Path
from typing import Any

import numpy as np
import srt
from PIL import Image
from pydub import AudioSegment

from manim import __version__

from .. import config, logger
from .._config.logger_utils import set_file_logger
from ..constants import FFMPEG_BIN, GIF_FILE_EXTENSION
from ..utils.file_ops import (
    add_extension_if_not_present,
    add_version_before_extension,
    guarantee_existence,
    is_gif_format,
    is_png_format,
    is_webm_format,
    modify_atime,
    write_to_movie,
)
from ..utils.sounds import get_full_sound_file_path
from .section import DefaultSectionType, Section


[docs]class SceneFileWriter: """ SceneFileWriter is the object that actually writes the animations played, into video files, using FFMPEG. This is mostly for Manim's internal use. You will rarely, if ever, have to use the methods for this class, unless tinkering with the very fabric of Manim's reality. Attributes ---------- sections : list of :class:`.Section` used to segment scene sections_output_dir : str where are section videos stored output_name : str name of movie without extension and basis for section video names Some useful attributes are: "write_to_movie" (bool=False) Whether or not to write the animations into a video file. "movie_file_extension" (str=".mp4") The file-type extension of the outputted video. "partial_movie_files" List of all the partial-movie files. """ force_output_as_scene_name = False def __init__(self, renderer, scene_name, **kwargs): self.renderer = renderer self.init_output_directories(scene_name) self.init_audio() self.frame_count = 0 self.partial_movie_files: list[str] = [] self.subcaptions: list[srt.Subtitle] = [] self.sections: list[Section] = [] # first section gets automatically created for convenience # if you need the first section to be skipped, add a first section by hand, it will replace this one self.next_section( name="autocreated", type=DefaultSectionType.NORMAL, skip_animations=False )
[docs] def init_output_directories(self, scene_name): """Initialise output directories. Notes ----- The directories are read from ``config``, for example ``config['media_dir']``. If the target directories don't already exist, they will be created. """ if config["dry_run"]: # in dry-run mode there is no output return if config["input_file"]: module_name = config.get_dir("input_file").stem else: module_name = "" if SceneFileWriter.force_output_as_scene_name: self.output_name = Path(scene_name) elif config["output_file"] and not config["write_all"]: self.output_name = config.get_dir("output_file") else: self.output_name = Path(scene_name) if config["media_dir"]: image_dir = guarantee_existence( config.get_dir( "images_dir", module_name=module_name, scene_name=scene_name ), ) self.image_file_path = image_dir / add_extension_if_not_present( self.output_name, ".png" ) if write_to_movie(): movie_dir = guarantee_existence( config.get_dir( "video_dir", module_name=module_name, scene_name=scene_name ), ) self.movie_file_path = movie_dir / add_extension_if_not_present( self.output_name, config["movie_file_extension"] ) # TODO: /dev/null would be good in case sections_output_dir is used without bein set (doesn't work on Windows), everyone likes defensive programming, right? self.sections_output_dir = "" if config.save_sections: self.sections_output_dir = guarantee_existence( config.get_dir( "sections_dir", module_name=module_name, scene_name=scene_name ) ) if is_gif_format(): self.gif_file_path = add_extension_if_not_present( self.output_name, GIF_FILE_EXTENSION ) if not config["output_file"]: self.gif_file_path = add_version_before_extension( self.gif_file_path ) self.gif_file_path = movie_dir / self.gif_file_path self.partial_movie_directory = guarantee_existence( config.get_dir( "partial_movie_dir", scene_name=scene_name, module_name=module_name, ), ) if config["log_to_file"]: log_dir = guarantee_existence(config.get_dir("log_dir")) set_file_logger( scene_name=scene_name, module_name=module_name, log_dir=log_dir )
[docs] def finish_last_section(self) -> None: """Delete current section if it is empty.""" if len(self.sections) and self.sections[-1].is_empty(): self.sections.pop()
[docs] def next_section(self, name: str, type: str, skip_animations: bool) -> None: """Create segmentation cut here.""" self.finish_last_section() # images don't support sections section_video: str | None = None # don't save when None if ( not config.dry_run and write_to_movie() and config.save_sections and not skip_animations ): # relative to index file section_video = f"{self.output_name}_{len(self.sections):04}{config.movie_file_extension}" self.sections.append( Section( type, section_video, name, skip_animations, ), )
[docs] def add_partial_movie_file(self, hash_animation): """Adds a new partial movie file path to `scene.partial_movie_files` and current section from a hash. This method will compute the path from the hash. In addition to that it adds the new animation to the current section. Parameters ---------- hash_animation : str Hash of the animation. """ if not hasattr(self, "partial_movie_directory") or not write_to_movie(): return # None has to be added to partial_movie_files to keep the right index with scene.num_plays. # i.e if an animation is skipped, scene.num_plays is still incremented and we add an element to partial_movie_file be even with num_plays. if hash_animation is None: self.partial_movie_files.append(None) self.sections[-1].partial_movie_files.append(None) else: new_partial_movie_file = str( self.partial_movie_directory / f"{hash_animation}{config['movie_file_extension']}" ) self.partial_movie_files.append(new_partial_movie_file) self.sections[-1].partial_movie_files.append(new_partial_movie_file)
[docs] def get_resolution_directory(self): """Get the name of the resolution directory directly containing the video file. This method gets the name of the directory that immediately contains the video file. This name is ``<height_in_pixels_of_video>p<frame_rate>``. For example, if you are rendering an 854x480 px animation at 15fps, the name of the directory that immediately contains the video, file will be ``480p15``. The file structure should look something like:: MEDIA_DIR |--Tex |--texts |--videos |--<name_of_file_containing_scene> |--<height_in_pixels_of_video>p<frame_rate> |--<scene_name>.mp4 Returns ------- :class:`str` The name of the directory. """ pixel_height = config["pixel_height"] frame_rate = config["frame_rate"] return f"{pixel_height}p{frame_rate}"
# Sound
[docs] def init_audio(self): """ Preps the writer for adding audio to the movie. """ self.includes_sound = False
[docs] def create_audio_segment(self): """ Creates an empty, silent, Audio Segment. """ self.audio_segment = AudioSegment.silent()
[docs] def add_audio_segment(self, new_segment, time=None, gain_to_background=None): """ This method adds an audio segment from an AudioSegment type object and suitable parameters. Parameters ---------- new_segment : AudioSegment The audio segment to add time : int, float, optional the timestamp at which the sound should be added. gain_to_background : optional The gain of the segment from the background. """ if not self.includes_sound: self.includes_sound = True self.create_audio_segment() segment = self.audio_segment curr_end = segment.duration_seconds if time is None: time = curr_end if time < 0: raise ValueError("Adding sound at timestamp < 0") new_end = time + new_segment.duration_seconds diff = new_end - curr_end if diff > 0: segment = segment.append( AudioSegment.silent(int(np.ceil(diff * 1000))), crossfade=0, ) self.audio_segment = segment.overlay( new_segment, position=int(1000 * time), gain_during_overlay=gain_to_background, )
[docs] def add_sound(self, sound_file, time=None, gain=None, **kwargs): """ This method adds an audio segment from a sound file. Parameters ---------- sound_file : str The path to the sound file. time : float or int, optional The timestamp at which the audio should be added. gain : optional The gain of the given audio segment. **kwargs This method uses add_audio_segment, so any keyword arguments used there can be referenced here. """ file_path = get_full_sound_file_path(sound_file) new_segment = AudioSegment.from_file(file_path) if gain: new_segment = new_segment.apply_gain(gain) self.add_audio_segment(new_segment, time, **kwargs)
# Writers
[docs] def begin_animation(self, allow_write=False, file_path=None): """ Used internally by manim to stream the animation to FFMPEG for displaying or writing to a file. Parameters ---------- allow_write : bool, optional Whether or not to write to a video file. """ if write_to_movie() and allow_write: self.open_movie_pipe(file_path=file_path)
[docs] def end_animation(self, allow_write=False): """ Internally used by Manim to stop streaming to FFMPEG gracefully. Parameters ---------- allow_write : bool, optional Whether or not to write to a video file. """ if write_to_movie() and allow_write: self.close_movie_pipe()
[docs] def write_frame(self, frame_or_renderer): """ Used internally by Manim to write a frame to the FFMPEG input buffer. Parameters ---------- frame : np.array Pixel array of the frame. """ if config.renderer == "opengl": self.write_opengl_frame(frame_or_renderer) else: frame = frame_or_renderer if write_to_movie(): self.writing_process.stdin.write(frame.tobytes()) if is_png_format() and not config["dry_run"]: self.output_image_from_array(frame)
def write_opengl_frame(self, renderer): if write_to_movie(): self.writing_process.stdin.write( renderer.get_raw_frame_buffer_object_data(), ) elif is_png_format() and not config["dry_run"]: target_dir = self.image_file_path.parent / self.image_file_path.stem extension = self.image_file_path.suffix self.output_image( renderer.get_image(), target_dir, extension, config["zero_pad"], ) def output_image_from_array(self, frame_data): target_dir = self.image_file_path.parent / self.image_file_path.stem extension = self.image_file_path.suffix self.output_image( Image.fromarray(frame_data), target_dir, extension, config["zero_pad"], ) def output_image(self, image: Image.Image, target_dir, ext, zero_pad: bool): if zero_pad: image.save(f"{target_dir}{str(self.frame_count).zfill(zero_pad)}{ext}") else: image.save(f"{target_dir}{self.frame_count}{ext}") self.frame_count += 1
[docs] def save_final_image(self, image): """ The name is a misnomer. This method saves the image passed to it as an in the default image directory. Parameters ---------- image : np.array The pixel array of the image to save. """ if config["dry_run"]: return if not config["output_file"]: self.image_file_path = add_version_before_extension(self.image_file_path) image.save(self.image_file_path) self.print_file_ready_message(self.image_file_path)
[docs] def finish(self): """ Finishes writing to the FFMPEG buffer or writing images to output directory. Combines the partial movie files into the whole scene. If save_last_frame is True, saves the last frame in the default image directory. """ if write_to_movie(): if hasattr(self, "writing_process"): self.writing_process.terminate() self.combine_to_movie() if config.save_sections: self.combine_to_section_videos() if config["flush_cache"]: self.flush_cache_directory() else: self.clean_cache() elif is_png_format() and not config["dry_run"]: target_dir = self.image_file_path.parent / self.image_file_path.stem logger.info("\n%i images ready at %s\n", self.frame_count, str(target_dir)) if self.subcaptions: self.write_subcaption_file()
[docs] def open_movie_pipe(self, file_path=None): """ Used internally by Manim to initialise FFMPEG and begin writing to FFMPEG's input buffer. """ if file_path is None: file_path = self.partial_movie_files[self.renderer.num_plays] self.partial_movie_file_path = file_path fps = config["frame_rate"] if fps == int(fps): # fps is integer fps = int(fps) if config.renderer == "opengl": width, height = self.renderer.get_pixel_shape() else: height = config["pixel_height"] width = config["pixel_width"] command = [ FFMPEG_BIN, "-y", # overwrite output file if it exists "-f", "rawvideo", "-s", "%dx%d" % (width, height), # size of one frame "-pix_fmt", "rgba", "-r", str(fps), # frames per second "-i", "-", # The input comes from a pipe "-an", # Tells FFMPEG not to expect any audio "-loglevel", config["ffmpeg_loglevel"].lower(), "-metadata", f"comment=Rendered with Manim Community v{__version__}", ] if config.renderer == "opengl": command += ["-vf", "vflip"] if is_webm_format(): command += ["-vcodec", "libvpx-vp9", "-auto-alt-ref", "0"] # .mov format elif config["transparent"]: command += ["-vcodec", "qtrle"] else: command += ["-vcodec", "libx264", "-pix_fmt", "yuv420p"] command += [file_path] self.writing_process = subprocess.Popen(command, stdin=subprocess.PIPE)
[docs] def close_movie_pipe(self): """ Used internally by Manim to gracefully stop writing to FFMPEG's input buffer """ self.writing_process.stdin.close() self.writing_process.wait() logger.info( f"Animation {self.renderer.num_plays} : Partial movie file written in %(path)s", {"path": f"'{self.partial_movie_file_path}'"}, )
[docs] def is_already_cached(self, hash_invocation): """Will check if a file named with `hash_invocation` exists. Parameters ---------- hash_invocation : :class:`str` The hash corresponding to an invocation to either `scene.play` or `scene.wait`. Returns ------- :class:`bool` Whether the file exists. """ if not hasattr(self, "partial_movie_directory") or not write_to_movie(): return False path = ( self.partial_movie_directory / f"{hash_invocation}{config['movie_file_extension']}" ) return path.exists()
def combine_files( self, input_files: list[str], output_file: Path | str, create_gif=False, includes_sound=False, ): file_list = str(self.partial_movie_directory / "partial_movie_file_list.txt") logger.debug( f"Partial movie files to combine ({len(input_files)} files): %(p)s", {"p": input_files[:5]}, ) with open(file_list, "w", encoding="utf-8") as fp: fp.write("# This file is used internally by FFMPEG.\n") for pf_path in input_files: pf_path = str(pf_path) if os.name == "nt": pf_path = pf_path.replace("\\", "/") fp.write(f"file 'file:{pf_path}'\n") commands = [ FFMPEG_BIN, "-y", # overwrite output file if it exists "-f", "concat", "-safe", "0", "-i", file_list, "-loglevel", config.ffmpeg_loglevel.lower(), "-metadata", f"comment=Rendered with Manim Community v{__version__}", "-nostdin", ] if create_gif: commands += [ "-vf", f"fps={np.clip(config['frame_rate'], 1, 50)},split[s0][s1];[s0]palettegen=stats_mode=diff[p];[s1][p]paletteuse=dither=bayer:bayer_scale=5:diff_mode=rectangle", ] else: commands += ["-c", "copy"] if not includes_sound: commands += ["-an"] commands += [str(output_file)] combine_process = subprocess.Popen(commands) combine_process.wait()
[docs] def combine_to_movie(self): """Used internally by Manim to combine the separate partial movie files that make up a Scene into a single video file for that Scene. """ partial_movie_files = [el for el in self.partial_movie_files if el is not None] # NOTE: Here we should do a check and raise an exception if partial # movie file is empty. We can't, as a lot of stuff (in particular, in # tests) use scene initialization, and this error would be raised as # it's just an empty scene initialized. # determine output path movie_file_path = self.movie_file_path if is_gif_format(): movie_file_path = self.gif_file_path movie_file_path = str(movie_file_path) logger.info("Combining to Movie file.") self.combine_files( partial_movie_files, movie_file_path, is_gif_format(), self.includes_sound, ) # handle sound if self.includes_sound: extension = config["movie_file_extension"] sound_file_path = movie_file_path.replace(extension, ".wav") # Makes sure sound file length will match video file self.add_audio_segment(AudioSegment.silent(0)) self.audio_segment.export( sound_file_path, bitrate="312k", ) temp_file_path = movie_file_path.replace(extension, f"_temp{extension}") commands = [ FFMPEG_BIN, "-i", movie_file_path, "-i", sound_file_path, "-y", # overwrite output file if it exists "-c:v", "copy", "-c:a", "aac", "-b:a", "320k", # select video stream from first file "-map", "0:v:0", # select audio stream from second file "-map", "1:a:0", "-loglevel", config.ffmpeg_loglevel.lower(), "-metadata", f"comment=Rendered with Manim Community v{__version__}", # "-shortest", temp_file_path, ] subprocess.call(commands) shutil.move(temp_file_path, movie_file_path) os.remove(sound_file_path) self.print_file_ready_message(movie_file_path) if write_to_movie(): for file_path in partial_movie_files: # We have to modify the accessed time so if we have to clean the cache we remove the one used the longest. modify_atime(file_path)
[docs] def combine_to_section_videos(self) -> None: """Concatenate partial movie files for each section.""" self.finish_last_section() sections_index: list[dict[str, Any]] = [] for section in self.sections: # only if section does want to be saved if section.video is not None: logger.info(f"Combining partial files for section '{section.name}'") self.combine_files( section.get_clean_partial_movie_files(), os.path.join(self.sections_output_dir, section.video), ) sections_index.append(section.get_dict(self.sections_output_dir)) with open( os.path.join(self.sections_output_dir, f"{self.output_name}.json"), "w" ) as file: json.dump(sections_index, file, indent=4)
[docs] def clean_cache(self): """Will clean the cache by removing the oldest partial_movie_files.""" cached_partial_movies = [ (self.partial_movie_directory / file_name) for file_name in self.partial_movie_directory.iterdir() if file_name != "partial_movie_file_list.txt" ] if len(cached_partial_movies) > config["max_files_cached"]: number_files_to_delete = ( len(cached_partial_movies) - config["max_files_cached"] ) oldest_files_to_delete = sorted( cached_partial_movies, key=os.path.getatime, )[:number_files_to_delete] # oldest_file_path = min(cached_partial_movies, key=os.path.getatime) for file_to_delete in oldest_files_to_delete: file_to_delete.unlink() logger.info( f"The partial movie directory is full (> {config['max_files_cached']} files). Therefore, manim has removed the {number_files_to_delete} oldest file(s)." " You can change this behaviour by changing max_files_cached in config.", )
[docs] def flush_cache_directory(self): """Delete all the cached partial movie files""" cached_partial_movies = [ self.partial_movie_directory / file_name for file_name in self.partial_movie_directory.iterdir() if file_name != "partial_movie_file_list.txt" ] for f in cached_partial_movies: f.unlink() logger.info( f"Cache flushed. {len(cached_partial_movies)} file(s) deleted in %(par_dir)s.", {"par_dir": self.partial_movie_directory}, )
[docs] def write_subcaption_file(self): """Writes the subcaption file.""" subcaption_file = Path(config.output_file).with_suffix(".srt") with open(subcaption_file, "w") as f: f.write(srt.compose(self.subcaptions)) logger.info(f"Subcaption file has been written as {subcaption_file}")
[docs] def print_file_ready_message(self, file_path): """Prints the "File Ready" message to STDOUT.""" config["output_file"] = file_path logger.info("\nFile ready at %(file_path)s\n", {"file_path": f"'{file_path}'"})