"""The interface between scenes and ffmpeg."""
from __future__ import annotations
__all__ = ["SceneFileWriter"]
import json
import shutil
import warnings
from fractions import Fraction
from pathlib import Path
from queue import Queue
from tempfile import NamedTemporaryFile, _TemporaryFileWrapper
from threading import Thread
from typing import TYPE_CHECKING, Any
import av
import numpy as np
import srt
from PIL import Image
# Manim handles audio conversion through PyAV directly. Importing pydub emits a
# RuntimeWarning if ffmpeg/avconv is not on PATH, even when only WAV code paths
# are used (which do not need ffmpeg). Silence this specific warning.
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore",
message=r".*ffmpeg or avconv.*",
category=RuntimeWarning,
)
from pydub import AudioSegment
from manim import __version__
from .. import config, logger
from .._config.logger_utils import set_file_logger
from ..constants import RendererType
from ..utils.file_ops import (
add_extension_if_not_present,
add_version_before_extension,
guarantee_existence,
is_gif_format,
is_png_format,
modify_atime,
write_to_movie,
)
from ..utils.sounds import get_full_sound_file_path
from .section import DefaultSectionType, Section
if TYPE_CHECKING:
from av.container.output import OutputContainer
from av.stream import Stream
from manim.renderer.cairo_renderer import CairoRenderer
from manim.renderer.opengl_renderer import OpenGLRenderer
from manim.typing import PixelArray, StrPath
[docs]
def to_av_frame_rate(fps: float) -> Fraction:
epsilon1 = 1e-4
epsilon2 = 0.02
if isinstance(fps, int):
(num, denom) = (fps, 1)
elif abs(fps - round(fps)) < epsilon1:
(num, denom) = (round(fps), 1)
else:
denom = 1001
num = round(fps * denom / 1000) * 1000
if abs(fps - num / denom) >= epsilon2:
raise ValueError("invalid frame rate")
return Fraction(num, denom)
[docs]
def convert_audio(
input_path: Path, output_path: Path | _TemporaryFileWrapper[bytes], codec_name: str
) -> None:
with (
av.open(input_path) as input_audio,
av.open(output_path, "w") as output_audio,
):
input_audio_stream = input_audio.streams.audio[0]
output_audio_stream = output_audio.add_stream(codec_name)
for frame in input_audio.decode(input_audio_stream):
for packet in output_audio_stream.encode(frame):
output_audio.mux(packet)
for packet in output_audio_stream.encode():
output_audio.mux(packet)
[docs]
class SceneFileWriter:
"""SceneFileWriter is the object that actually writes the animations
played, into video files, using FFMPEG.
This is mostly for Manim's internal use. You will rarely, if ever,
have to use the methods for this class, unless tinkering with the very
fabric of Manim's reality.
Attributes
----------
sections : list of :class:`.Section`
used to segment scene
sections_output_dir : :class:`pathlib.Path`
where are section videos stored
output_name : str
name of movie without extension and basis for section video names
Some useful attributes are:
"write_to_movie" (bool=False)
Whether or not to write the animations into a video file.
"movie_file_extension" (str=".mp4")
The file-type extension of the outputted video.
"partial_movie_files"
List of all the partial-movie files.
"""
force_output_as_scene_name = False
def __init__(
self,
renderer: CairoRenderer | OpenGLRenderer,
scene_name: str,
**kwargs: Any,
) -> None:
self.renderer = renderer
self.init_output_directories(scene_name)
self.init_audio()
self.frame_count = 0
self.partial_movie_files: list[str | None] = []
self.subcaptions: list[srt.Subtitle] = []
self.sections: list[Section] = []
# first section gets automatically created for convenience
# if you need the first section to be skipped, add a first section by hand, it will replace this one
self.next_section(
name="autocreated", type_=DefaultSectionType.NORMAL, skip_animations=False
)
[docs]
def init_output_directories(self, scene_name: str) -> None:
"""Initialise output directories.
Notes
-----
The directories are read from ``config``, for example
``config['media_dir']``. If the target directories don't already
exist, they will be created.
"""
if config["dry_run"]: # in dry-run mode there is no output
return
module_name = config.get_dir("input_file").stem if config["input_file"] else ""
if SceneFileWriter.force_output_as_scene_name:
self.output_name = Path(scene_name)
elif config["output_file"] and not config["write_all"]:
self.output_name = config.get_dir("output_file")
else:
self.output_name = Path(scene_name)
if config["media_dir"]:
image_dir = guarantee_existence(
config.get_dir(
"images_dir", module_name=module_name, scene_name=scene_name
),
)
self.image_file_path = image_dir / add_extension_if_not_present(
self.output_name, ".png"
)
if write_to_movie():
movie_dir = guarantee_existence(
config.get_dir(
"video_dir", module_name=module_name, scene_name=scene_name
),
)
self.movie_file_path = movie_dir / add_extension_if_not_present(
self.output_name, config["movie_file_extension"]
)
# TODO: /dev/null would be good in case sections_output_dir is used without being set (doesn't work on Windows), everyone likes defensive programming, right?
self.sections_output_dir = Path("")
if config.save_sections:
self.sections_output_dir = guarantee_existence(
config.get_dir(
"sections_dir", module_name=module_name, scene_name=scene_name
)
)
if is_gif_format():
self.gif_file_path = add_extension_if_not_present(
self.output_name, ".gif"
)
if not config["output_file"]:
self.gif_file_path = add_version_before_extension(
self.gif_file_path
)
self.gif_file_path = movie_dir / self.gif_file_path
self.partial_movie_directory = guarantee_existence(
config.get_dir(
"partial_movie_dir",
scene_name=scene_name,
module_name=module_name,
),
)
if config["log_to_file"]:
log_dir = guarantee_existence(config.get_dir("log_dir"))
set_file_logger(
scene_name=scene_name, module_name=module_name, log_dir=log_dir
)
[docs]
def finish_last_section(self) -> None:
"""Delete current section if it is empty."""
if len(self.sections) and self.sections[-1].is_empty():
self.sections.pop()
[docs]
def next_section(self, name: str, type_: str, skip_animations: bool) -> None:
"""Create segmentation cut here."""
self.finish_last_section()
# images don't support sections
section_video: str | None = None
# don't save when None
if (
not config.dry_run
and write_to_movie()
and config.save_sections
and not skip_animations
):
# relative to index file
section_video = f"{self.output_name}_{len(self.sections):04}_{name}{config.movie_file_extension}"
self.sections.append(
Section(
type_,
section_video,
name,
skip_animations,
),
)
[docs]
def add_partial_movie_file(self, hash_animation: str | None) -> None:
"""Adds a new partial movie file path to ``scene.partial_movie_files``
and current section from a hash.
This method will compute the path from the hash. In addition to that it
adds the new animation to the current section.
Parameters
----------
hash_animation
Hash of the animation.
"""
if not hasattr(self, "partial_movie_directory") or not write_to_movie():
return
# None has to be added to partial_movie_files to keep the right index with scene.num_plays.
# i.e if an animation is skipped, scene.num_plays is still incremented and we add an element to partial_movie_file be even with num_plays.
if hash_animation is None:
self.partial_movie_files.append(None)
self.sections[-1].partial_movie_files.append(None)
else:
new_partial_movie_file = str(
self.partial_movie_directory
/ f"{hash_animation}{config['movie_file_extension']}"
)
self.partial_movie_files.append(new_partial_movie_file)
self.sections[-1].partial_movie_files.append(new_partial_movie_file)
[docs]
def get_resolution_directory(self) -> str:
"""Get the name of the resolution directory directly containing
the video file.
This method gets the name of the directory that immediately contains the
video file. This name is ``<height_in_pixels_of_video>p<frame_rate>``.
For example, if you are rendering an 854x480 px animation at 15fps,
the name of the directory that immediately contains the video, file
will be ``480p15``.
The file structure should look something like::
MEDIA_DIR
|--Tex
|--texts
|--videos
|--<name_of_file_containing_scene>
|--<height_in_pixels_of_video>p<frame_rate>
|--partial_movie_files
|--<scene_name>.mp4
|--<scene_name>.srt
Returns
-------
:class:`str`
The name of the directory.
"""
pixel_height = config["pixel_height"]
frame_rate = config["frame_rate"]
return f"{pixel_height}p{frame_rate}"
# Sound
[docs]
def init_audio(self) -> None:
"""Preps the writer for adding audio to the movie."""
self.includes_sound = False
[docs]
def create_audio_segment(self) -> None:
"""Creates an empty, silent, Audio Segment."""
self.audio_segment = AudioSegment.silent()
[docs]
def add_audio_segment(
self,
new_segment: AudioSegment,
time: float | None = None,
gain_to_background: float | None = None,
) -> None:
"""This method adds an audio segment from an AudioSegment type object
and suitable parameters.
Parameters
----------
new_segment
The audio segment to add
time
the timestamp at which the sound should be added.
gain_to_background
The gain of the segment from the background.
"""
if not self.includes_sound:
self.includes_sound = True
self.create_audio_segment()
segment = self.audio_segment
curr_end = segment.duration_seconds
if time is None:
time = curr_end
if time < 0:
raise ValueError("Adding sound at timestamp < 0")
new_end = time + new_segment.duration_seconds
diff = new_end - curr_end
if diff > 0:
segment = segment.append(
AudioSegment.silent(int(np.ceil(diff * 1000))),
crossfade=0,
)
self.audio_segment = segment.overlay(
new_segment,
position=int(1000 * time),
gain_during_overlay=gain_to_background,
)
[docs]
def add_sound(
self,
sound_file: StrPath,
time: float | None = None,
gain: float | None = None,
**kwargs: Any,
) -> None:
"""This method adds an audio segment from a sound file.
Parameters
----------
sound_file
The path to the sound file.
time
The timestamp at which the audio should be added.
gain
The gain of the given audio segment.
**kwargs
This method uses add_audio_segment, so any keyword arguments
used there can be referenced here.
"""
file_path = get_full_sound_file_path(sound_file)
# we assume files with .wav / .raw suffix are actually
# .wav and .raw files, respectively.
if file_path.suffix not in (".wav", ".raw"):
# we need to pass delete=False to work on Windows
# TODO: figure out a way to cache the wav file generated (benchmark needed)
with NamedTemporaryFile(suffix=".wav", delete=False) as wav_file_path:
convert_audio(file_path, wav_file_path, "pcm_s16le")
new_segment = AudioSegment.from_file(wav_file_path.name)
logger.info(f"Automatically converted {file_path} to .wav")
Path(wav_file_path.name).unlink()
else:
new_segment = AudioSegment.from_file(file_path)
if gain:
new_segment = new_segment.apply_gain(gain)
self.add_audio_segment(new_segment, time, **kwargs)
# Writers
[docs]
def begin_animation(
self, allow_write: bool = False, file_path: StrPath | None = None
) -> None:
"""Used internally by manim to stream the animation to FFMPEG for
displaying or writing to a file.
Parameters
----------
allow_write
Whether or not to write to a video file.
"""
if write_to_movie() and allow_write:
self.open_partial_movie_stream(file_path=file_path)
[docs]
def end_animation(self, allow_write: bool = False) -> None:
"""Internally used by Manim to stop streaming to FFMPEG gracefully.
Parameters
----------
allow_write
Whether or not to write to a video file.
"""
if write_to_movie() and allow_write:
self.close_partial_movie_stream()
[docs]
def listen_and_write(self) -> None:
"""For internal use only: blocks until new frame is available on the queue."""
while True:
num_frames, frame_data = self.queue.get()
if frame_data is None:
break
self.encode_and_write_frame(frame_data, num_frames)
[docs]
def encode_and_write_frame(self, frame: PixelArray, num_frames: int) -> None:
"""For internal use only: takes a given frame in ``np.ndarray`` format and
writes it to the stream
"""
for _ in range(num_frames):
# Notes: precomputing reusing packets does not work!
# I.e., you cannot do `packets = encode(...)`
# and reuse it, as it seems that `mux(...)`
# consumes the packet.
# The same issue applies for `av_frame`,
# reusing it renders weird-looking frames.
av_frame = av.VideoFrame.from_ndarray(frame, format="rgba")
for packet in self.video_stream.encode(av_frame):
self.video_container.mux(packet)
[docs]
def write_frame(
self, frame_or_renderer: PixelArray | OpenGLRenderer, num_frames: int = 1
) -> None:
"""Used internally by Manim to write a frame to the FFMPEG input buffer.
Parameters
----------
frame_or_renderer
Pixel array of the frame.
num_frames
The number of times to write frame.
"""
if write_to_movie():
if isinstance(frame_or_renderer, np.ndarray):
frame = frame_or_renderer
else:
frame = (
frame_or_renderer.get_frame()
if config.renderer == RendererType.OPENGL
else frame_or_renderer
)
msg = (num_frames, frame)
self.queue.put(msg)
if is_png_format() and not config["dry_run"]:
if isinstance(frame_or_renderer, np.ndarray):
image = Image.fromarray(frame_or_renderer)
else:
image = (
frame_or_renderer.get_image()
if config.renderer == RendererType.OPENGL
else Image.fromarray(frame_or_renderer)
)
target_dir = self.image_file_path.parent / self.image_file_path.stem
extension = self.image_file_path.suffix
self.output_image(
image,
target_dir,
extension,
config["zero_pad"],
)
def output_image(
self, image: Image.Image, target_dir: StrPath, ext: str, zero_pad: int
) -> None:
if zero_pad:
image.save(f"{target_dir}{str(self.frame_count).zfill(zero_pad)}{ext}")
else:
image.save(f"{target_dir}{self.frame_count}{ext}")
self.frame_count += 1
[docs]
def save_image(self, image: Image.Image) -> None:
"""This method saves the image passed to it in the default image directory.
Parameters
----------
image
The pixel array of the image to save.
"""
if config["dry_run"]:
return
if not config["output_file"]:
self.image_file_path = add_version_before_extension(self.image_file_path)
image.save(self.image_file_path)
self.print_file_ready_message(self.image_file_path)
[docs]
def finish(self) -> None:
"""Finishes writing to the FFMPEG buffer or writing images to output directory.
Combines the partial movie files into the whole scene.
If save_last_frame is True, saves the last frame in the default image directory.
"""
if write_to_movie():
self.combine_to_movie()
if config.save_sections:
self.combine_to_section_videos()
if config["flush_cache"]:
self.flush_cache_directory()
else:
self.clean_cache()
elif is_png_format() and not config["dry_run"]:
target_dir = self.image_file_path.parent / self.image_file_path.stem
logger.info("\n%i images ready at %s\n", self.frame_count, str(target_dir))
if self.subcaptions:
self.write_subcaption_file()
[docs]
def open_partial_movie_stream(self, file_path: StrPath | None = None) -> None:
"""Open a container holding a video stream.
This is used internally by Manim initialize the container holding
the video stream of a partial movie file.
"""
if file_path is None:
file_path = self.partial_movie_files[self.renderer.num_plays]
self.partial_movie_file_path = file_path
fps = to_av_frame_rate(config.frame_rate)
partial_movie_file_codec = "libx264"
partial_movie_file_pix_fmt = "yuv420p"
av_options = {
"an": "1", # ffmpeg: -an, no audio
"crf": "23", # ffmpeg: -crf, constant rate factor (improved bitrate)
}
if config.movie_file_extension == ".webm":
partial_movie_file_codec = "libvpx-vp9"
av_options["-auto-alt-ref"] = "1"
if config.transparent:
partial_movie_file_pix_fmt = "yuva420p"
elif config.transparent:
partial_movie_file_codec = "qtrle"
partial_movie_file_pix_fmt = "argb"
video_container = av.open(file_path, mode="w")
stream = video_container.add_stream(
partial_movie_file_codec,
rate=fps,
options=av_options,
)
stream.pix_fmt = partial_movie_file_pix_fmt
stream.width = config.pixel_width
stream.height = config.pixel_height
self.video_container: OutputContainer = video_container
self.video_stream: Stream = stream
self.queue: Queue[tuple[int, PixelArray | None]] = Queue()
self.writer_thread = Thread(target=self.listen_and_write, args=())
self.writer_thread.start()
[docs]
def close_partial_movie_stream(self) -> None:
"""Close the currently opened video container.
Used internally by Manim to first flush the remaining packages
in the video stream holding a partial file, and then close
the corresponding container.
"""
self.queue.put((-1, None))
self.writer_thread.join()
for packet in self.video_stream.encode():
self.video_container.mux(packet)
self.video_container.close()
logger.info(
f"Animation {self.renderer.num_plays} : Partial movie file written in %(path)s",
{"path": f"'{self.partial_movie_file_path}'"},
)
[docs]
def is_already_cached(self, hash_invocation: str) -> bool:
"""Will check if a file named with `hash_invocation` exists.
Parameters
----------
hash_invocation
The hash corresponding to an invocation to either `scene.play` or `scene.wait`.
Returns
-------
:class:`bool`
Whether the file exists.
"""
if not hasattr(self, "partial_movie_directory") or not write_to_movie():
return False
path = (
self.partial_movie_directory
/ f"{hash_invocation}{config['movie_file_extension']}"
)
return path.exists()
def combine_files(
self,
input_files: list[str],
output_file: Path,
create_gif: bool = False,
includes_sound: bool = False,
) -> None:
file_list = self.partial_movie_directory / "partial_movie_file_list.txt"
logger.debug(
f"Partial movie files to combine ({len(input_files)} files): %(p)s",
{"p": input_files[:5]},
)
with file_list.open("w", encoding="utf-8") as fp:
fp.write("# This file is used internally by FFMPEG.\n")
for pf_path in input_files:
pf_path = Path(pf_path).as_posix()
fp.write(f"file 'file:{pf_path}'\n")
av_options = {
"safe": "0", # needed to read files
}
if not includes_sound:
av_options["an"] = "1"
partial_movies_input = av.open(
str(file_list), options=av_options, format="concat"
)
partial_movies_stream = partial_movies_input.streams.video[0]
output_container = av.open(str(output_file), mode="w")
output_container.metadata["comment"] = (
f"Rendered with Manim Community v{__version__}"
)
if create_gif:
"""The following solution was largely inspired from this comment
https://github.com/imageio/imageio/issues/995#issuecomment-1580533018,
and the following code
https://github.com/imageio/imageio/blob/65d79140018bb7c64c0692ea72cb4093e8d632a0/imageio/plugins/pyav.py#L927-L996.
"""
output_stream = output_container.add_stream(
codec_name="gif",
)
output_stream.pix_fmt = "rgb8"
if config.transparent:
output_stream.pix_fmt = "pal8"
output_stream.width = config.pixel_width
output_stream.height = config.pixel_height
output_stream.rate = to_av_frame_rate(config.frame_rate)
graph = av.filter.Graph()
input_buffer = graph.add_buffer(template=partial_movies_stream)
split = graph.add("split")
palettegen = graph.add("palettegen", "stats_mode=diff")
paletteuse = graph.add(
"paletteuse", "dither=bayer:bayer_scale=5:diff_mode=rectangle"
)
output_sink = graph.add("buffersink")
input_buffer.link_to(split)
split.link_to(palettegen, 0, 0) # 1st input of split -> input of palettegen
split.link_to(paletteuse, 1, 0) # 2nd output of split -> 1st input
palettegen.link_to(paletteuse, 0, 1) # output of palettegen -> 2nd input
paletteuse.link_to(output_sink)
graph.configure()
for frame in partial_movies_input.decode(video=0):
graph.push(frame)
graph.push(None) # EOF: https://github.com/PyAV-Org/PyAV/issues/886.
frames_written = 0
while True:
try:
frame = graph.pull()
if output_stream.codec_context.time_base is not None:
frame.time_base = output_stream.codec_context.time_base
frame.pts = frames_written
frames_written += 1
output_container.mux(output_stream.encode(frame))
except av.error.EOFError:
break
for packet in output_stream.encode():
output_container.mux(packet)
else:
output_stream = output_container.add_stream_from_template(
template=partial_movies_stream,
)
if config.transparent and config.movie_file_extension == ".webm":
output_stream.pix_fmt = "yuva420p"
for packet in partial_movies_input.demux(partial_movies_stream):
# We need to skip the "flushing" packets that `demux` generates.
if packet.dts is None:
continue
packet.dts = None # This seems to be needed, as dts from consecutive
# files may not be monotically increasing, so we let libav compute it.
# We need to assign the packet to the new stream.
packet.stream = output_stream
output_container.mux(packet)
partial_movies_input.close()
output_container.close()
[docs]
def combine_to_movie(self) -> None:
"""Used internally by Manim to combine the separate
partial movie files that make up a Scene into a single
video file for that Scene.
"""
partial_movie_files = [el for el in self.partial_movie_files if el is not None]
# NOTE: Here we should do a check and raise an exception if partial
# movie file is empty. We can't, as a lot of stuff (in particular, in
# tests) use scene initialization, and this error would be raised as
# it's just an empty scene initialized.
# determine output path
movie_file_path = self.movie_file_path
if is_gif_format():
movie_file_path = self.gif_file_path
if len(partial_movie_files) == 0: # Prevent calling concat on empty list
logger.info("No animations are contained in this scene.")
return
logger.info("Combining to Movie file.")
self.combine_files(
partial_movie_files,
movie_file_path,
is_gif_format(),
self.includes_sound,
)
# handle sound
if self.includes_sound and config.format != "gif":
sound_file_path = movie_file_path.with_suffix(".wav")
# Makes sure sound file length will match video file
self.add_audio_segment(AudioSegment.silent(0))
self.audio_segment.export(
sound_file_path,
format="wav",
bitrate="312k",
)
# Audio added to a VP9 encoded (webm) video file needs
# to be encoded as vorbis or opus. Directly exporting
# self.audio_segment with such a codec works in principle,
# but tries to call ffmpeg via its CLI -- which we want
# to avoid. This is why we need to do the conversion
# manually.
if config.movie_file_extension == ".webm":
ogg_sound_file_path = sound_file_path.with_suffix(".ogg")
convert_audio(sound_file_path, ogg_sound_file_path, "libvorbis")
sound_file_path = ogg_sound_file_path
elif config.movie_file_extension == ".mp4":
# Similarly, pyav may reject wav audio in an .mp4 file;
# convert to AAC.
aac_sound_file_path = sound_file_path.with_suffix(".aac")
convert_audio(sound_file_path, aac_sound_file_path, "aac")
sound_file_path = aac_sound_file_path
temp_file_path = movie_file_path.with_name(
f"{movie_file_path.stem}_temp{movie_file_path.suffix}"
)
av_options = {
"shortest": "1",
"metadata": f"comment=Rendered with Manim Community v{__version__}",
}
with (
av.open(movie_file_path) as video_input,
av.open(sound_file_path) as audio_input,
):
video_stream = video_input.streams.video[0]
audio_stream = audio_input.streams.audio[0]
output_container = av.open(
str(temp_file_path), mode="w", options=av_options
)
output_video_stream = output_container.add_stream_from_template(
template=video_stream
)
output_audio_stream = output_container.add_stream_from_template(
template=audio_stream
)
for packet in video_input.demux(video_stream):
# We need to skip the "flushing" packets that `demux` generates.
if packet.dts is None:
continue
# We need to assign the packet to the new stream.
packet.stream = output_video_stream
output_container.mux(packet)
for packet in audio_input.demux(audio_stream):
# We need to skip the "flushing" packets that `demux` generates.
if packet.dts is None:
continue
# We need to assign the packet to the new stream.
packet.stream = output_audio_stream
output_container.mux(packet)
output_container.close()
shutil.move(str(temp_file_path), str(movie_file_path))
sound_file_path.unlink()
self.print_file_ready_message(str(movie_file_path))
if write_to_movie():
for file_path in partial_movie_files:
# We have to modify the accessed time so if we have to clean the cache we remove the one used the longest.
modify_atime(file_path)
[docs]
def combine_to_section_videos(self) -> None:
"""Concatenate partial movie files for each section."""
self.finish_last_section()
sections_index: list[dict[str, Any]] = []
for section in self.sections:
# only if section does want to be saved
if section.video is not None:
logger.info(f"Combining partial files for section '{section.name}'")
self.combine_files(
section.get_clean_partial_movie_files(),
self.sections_output_dir / section.video,
)
sections_index.append(section.get_dict(self.sections_output_dir))
with (self.sections_output_dir / f"{self.output_name}.json").open("w") as file:
json.dump(sections_index, file, indent=4)
[docs]
def clean_cache(self) -> None:
"""Will clean the cache by removing the oldest partial_movie_files."""
cached_partial_movies = [
(self.partial_movie_directory / file_name)
for file_name in self.partial_movie_directory.iterdir()
if file_name != "partial_movie_file_list.txt"
]
if len(cached_partial_movies) > config["max_files_cached"]:
number_files_to_delete = (
len(cached_partial_movies) - config["max_files_cached"]
)
oldest_files_to_delete = sorted(
cached_partial_movies,
key=lambda path: path.stat().st_atime,
)[:number_files_to_delete]
for file_to_delete in oldest_files_to_delete:
file_to_delete.unlink()
logger.info(
f"The partial movie directory is full (> {config['max_files_cached']} files). Therefore, manim has removed the {number_files_to_delete} oldest file(s)."
" You can change this behaviour by changing max_files_cached in config.",
)
[docs]
def flush_cache_directory(self) -> None:
"""Delete all the cached partial movie files"""
cached_partial_movies = [
self.partial_movie_directory / file_name
for file_name in self.partial_movie_directory.iterdir()
if file_name != "partial_movie_file_list.txt"
]
for f in cached_partial_movies:
f.unlink()
logger.info(
f"Cache flushed. {len(cached_partial_movies)} file(s) deleted in %(par_dir)s.",
{"par_dir": self.partial_movie_directory},
)
[docs]
def write_subcaption_file(self) -> None:
"""Writes the subcaption file."""
if config.output_file is None:
return
subcaption_file = Path(config.output_file).with_suffix(".srt")
subcaption_file.write_text(srt.compose(self.subcaptions), encoding="utf-8")
logger.info(f"Subcaption file has been written as {subcaption_file}")
[docs]
def print_file_ready_message(self, file_path: StrPath) -> None:
"""Prints the "File Ready" message to STDOUT."""
config["output_file"] = file_path
logger.info("\nFile ready at %(file_path)s\n", {"file_path": f"'{file_path}'"})