""" Version: 4.1.0 Author: Salty Usage: See help `python optimizelibrary.py --help` Requirements: 1) Make sure to install mutagen with `pip install mutagen`. 2) Make sure to have the latest versions of the following programs added to PATH: * flac (https://xiph.org/flac/download.html) * metaflac (https://xiph.org/flac/download.html) * oxipng (https://github.com/shssoichiro/oxipng) * libjxl (https://github.com/libjxl/libjxl) """ import argparse from functools import partial import math import mimetypes import os import platform import shutil import signal import subprocess import traceback from argparse import ArgumentDefaultsHelpFormatter, ArgumentParser, ArgumentTypeError, Namespace from multiprocessing import Pool from pathlib import Path, PurePath from threading import Event from typing import Iterator from mutagen.flac import FLAC from mutagen.id3 import PictureType class SuccessResult: def __init__(self, path: Path, old_size: int, new_size: int): self.path = path self.savings = old_size - new_size def __str__(self) -> str: return f"{res.path.as_posix()}\t{savings}\n" class FailureResult: def __init__(self, path: Path, msg: str): self.path = path self.msg = msg.replace("\r\n", "\n").strip() def __str__(self) -> str: return f"{res.path.as_posix()}\n\n{res.msg}\n\n" """Utils""" def is_arg_folder(value: str): if not Path(value).is_dir(): raise ArgumentTypeError(f"invalid folder: {value}") return value def format_size(value): for mag, unit in enumerate([ "B", "KiB", "MiB", "GiB", "TiB" ]): if abs(value) < 1024.0: return f"{value:.0f} {unit}" if mag == 0 else f"{value:.2f} {unit}" value /= 1024 return f"{value:.1f} PiB" def walk(path: str, cancel: Event) -> Iterator[Path]: for path, _, files in os.walk(path): if cancel.is_set(): return for file in files: file_path = Path(path).joinpath(file) if file_path.suffix in [ ".jpg", ".jpeg", ".png", ".flac", ".log", ".txt" ]: yield file_path def run(args: list[str]) -> bool: if platform.system() == "Windows": res = subprocess.run(args, capture_output=True, timeout=300, creationflags=subprocess.CREATE_NEW_PROCESS_GROUP) else: res = subprocess.run(args, capture_output=True, timeout=300, preexec_fn=os.setpgrp) if res.returncode != 0: raise Exception(f"Failed to run command: {' '.join(args)}") """Worker""" def work_initializer(): signal.signal(signal.SIGINT, signal.SIG_IGN) def work_handler(args: Namespace, path: str): try: return work(args, path) except Exception as ex: return FailureResult(path, "".join(traceback.format_exception(type(ex), ex, ex.__traceback__))) def work(args: Namespace, path: Path): old_size = path.stat().st_size new_size = old_size if path.suffix == ".jpg" or path.suffix == ".jpeg": if args.convert_jpegxl_from_jpg and path.parent.name.lower() in [ "scans", "bk", "artwork", "images" ]: run([ "cjxl", "-q", "100", "--lossless_jpeg", "1", path, path.with_suffix(".jxl") ]) path.unlink(True) new_size = path.with_suffix(".jxl").stat().st_size elif path.suffix == ".png": if args.convert_jpegxl_from_png and path.parent.name.lower() in [ "scans", "bk", "artwork", "images" ]: run([ "cjxl", "-q", "100", path, path.with_suffix(".jxl") ]) path.unlink(True) new_size = path.with_suffix(".jxl").stat().st_size elif args.optimize_png: run([ "oxipng", path ]) new_size = path.stat().st_size elif path.suffix == ".flac": if args.extract_embedded: flac = FLAC(path) cover = next((picture for picture in flac.pictures if picture.type == PictureType.COVER_FRONT), None) if cover: cover_path = path.parent.joinpath("cover").with_suffix(mimetypes.guess_extension(cover.mime) or "unk") if not cover_path.exists(): cover_path.write_bytes(cover.data) if args.optimize_flac: run([ "flac", "-8", "-V", "-f", path ]) if args.optimize_flac or args.remove_embedded: run([ "metaflac", "--dont-use-padding", "--remove", "--block-type=PICTURE,PADDING" if args.remove_embedded else "--block-type=PADDING", path ]) run([ "metaflac", f"--add-padding={args.flac_padding}", path ]) new_size = path.stat().st_size elif path.name == "foo_dr.txt" or path.name == "Lossless Audio Checker.log": if args.delete_misc: path.unlink(True) new_size = 0 return SuccessResult(path, old_size, new_size) """Main""" parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter, add_help=False, description="Performs space saving optimizations as described on MH Wiki.") parser.add_argument("path", type=is_arg_folder, help="path to music folder") parser.add_argument("--help", action="help", default=argparse.SUPPRESS, help="show help and exit") parser.add_argument("--workers", metavar="N", type=int, default=math.floor(os.cpu_count() * .8), help="number of parallel jobs") parser.add_argument("--flac-padding", metavar="N", type=int, default=8192, help="bytes of padding to keep when optimizing FLAC files") parser.add_argument("--remove-embedded", action="store_true", help="remove embedded covers from FLAC files") parser.add_argument("--extract-embedded", action="store_true", help="extract embedded cover form FLAC files to image file") parser.add_argument("--convert-jpegxl-from-png", action="store_true", help="convert PNG scans to lossless JPEG XL files") parser.add_argument("--convert-jpegxl-from-jpg", action="store_true", help="convert JPEG scans to recompressed JPEG XL files") parser.add_argument("--optimize-flac", action="store_true", help="optimize FLAC files") parser.add_argument("--optimize-png", action="store_true", help="optimize PNG files") parser.add_argument("--delete-misc", action="store_true", help="delete dynamic range logs and transcode logs") args = parser.parse_args() if __name__ == "__main__": if args.optimize_flac and not shutil.which("flac"): print("Could not find flac!") exit(1) if (args.optimize_flac or args.remove_embedded) and not shutil.which("metaflac"): print("Could not find metaflac!") exit(1) if args.optimize_png and not shutil.which("oxipng"): print("Could not find oxipng!") exit(1) if (args.convert_jpegxl_from_jpg or args.convert_jpegxl_from_png) and not shutil.which("cjxl"): print("Could not find cjxl!") exit(1) if args.remove_alpha and not shutil.which("vips"): print("Could not find vips!") exit(1) print(f"Using {args.workers} workers...") count = 0 savings = 0 log_success = open("log_success.txt", "a+", encoding="utf-8", buffering=1) log_failure = open("log_failure.txt", "a+", encoding="utf-8", buffering=1) pool = Pool(args.workers, work_initializer) cancel = Event() def sigint_handler(signum, frame): print("Aborting...") cancel.set() signal.signal(signal.SIGINT, sigint_handler) for res in pool.imap(partial(work_handler, args), walk(args.path, cancel)): count += 1 if isinstance(res, SuccessResult): savings += res.savings log_success.write(str(res)) elif isinstance(res, FailureResult): log_failure.write(str(res)) if count % max(args.workers, 10) == 0: print(f" {count:>10} | {format_size(savings):>10} | {PurePath(res.path).relative_to(args.path).as_posix()}") print(f" {count:>10} | {format_size(savings):>10}") pool.close() log_success.close() log_failure.close()