From 0c7f244a999897a9a7739cbfd165db0052052fbe Mon Sep 17 00:00:00 2001 From: Alexander Wainwright Date: Fri, 19 Dec 2025 21:30:43 +1000 Subject: [PATCH] Refactor, add side-car and version --- src/emulsion/main.py | 275 +++++++++++++++++-------------------------- 1 file changed, 106 insertions(+), 169 deletions(-) diff --git a/src/emulsion/main.py b/src/emulsion/main.py index 47b6484..7c62484 100644 --- a/src/emulsion/main.py +++ b/src/emulsion/main.py @@ -4,12 +4,21 @@ import sys import subprocess import datetime import toml +import shlex # Used for properly quoting shell commands from alive_progress import alive_bar from concurrent.futures import ThreadPoolExecutor, as_completed +from importlib.metadata import version, PackageNotFoundError CONFIG_PATH = os.path.expanduser("~/.config/emulsion/config.toml") +def get_version(): + try: + return version('emulsion') + except PackageNotFoundError: + return 'unknown' + + def load_config(): if os.path.isfile(CONFIG_PATH): try: @@ -28,6 +37,12 @@ def parse_args(config): help='Image files to process (e.g. *.jpg *.tif).' ) + parser.add_argument( + '-v', '--version', + action='version', + version=f'%(prog)s {get_version()}' + ) + # Configurable fields parser.add_argument('--author', default=None, help='Name of the photographer.') parser.add_argument('--lab', default=None, help='Name of the lab who developed the film.') @@ -38,6 +53,7 @@ def parse_args(config): # Time settings parser.add_argument('--base-date', default=None, help='Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).') parser.add_argument('--time-increment', type=int, default=None, help='Time increment in seconds between images.') + parser.add_argument('--sidecar', action='store_true', help='Write to sidecar XMP files (e.g. image.jpg.xmp) instead of embedding.') parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without modifying files.') parser.add_argument('-j', '--workers', type=int, default=os.cpu_count() or 1, help='Number of parallel workers to run exiftool; defaults to number of CPUs.') parser.add_argument('--init-config', action='store_true', help='Create a default config file (if none exists) and exit.') @@ -45,97 +61,43 @@ def parse_args(config): args = parser.parse_args() # Merge from config - if args.author is None and 'author' in config: - args.author = config['author'] - - if args.lab is None and 'lab' in config: - args.lab = config['lab'] - - if args.make is None and 'make' in config: - args.make = config['make'] - - if args.model is None and 'model' in config: - args.model = config['model'] - - if args.film is None and 'film' in config: - args.film = config['film'] - - if args.time_increment is None and 'time_increment' in config: - args.time_increment = config['time_increment'] + keys = ['author', 'lab', 'make', 'model', 'film', 'time_increment'] + for k in keys: + if getattr(args, k) is None and k in config: + setattr(args, k, config[k]) return args def prompt_for_config(args): - """ - Prompt for config-only fields before creating a config file. - (Base date is ephemeral, not stored in config.) - """ try: if not args.author: args.author = input("Photographer's name (Author)? ").strip() - if args.lab is None: - resp = input("Lab name (optional, enter to skip)? ").strip() - args.lab = resp if resp else "" - + args.lab = input("Lab name (optional, enter to skip)? ").strip() if args.make is None: - resp = input("Camera make (optional, enter to skip)? ").strip() - args.make = resp if resp else "" - + args.make = input("Camera make (optional, enter to skip)? ").strip() if args.model is None: - resp = input("Camera model (optional, enter to skip)? ").strip() - args.model = resp if resp else "" - + args.model = input("Camera model (optional, enter to skip)? ").strip() if args.film is None: - resp = input("Film stock (optional, enter to skip)? ").strip() - args.film = resp if resp else "" - + args.film = input("Film stock (optional, enter to skip)? ").strip() if not args.time_increment: dflt = "60" resp = input(f"Time increment in seconds [{dflt}]: ").strip() args.time_increment = int(resp) if resp else int(dflt) - except KeyboardInterrupt: print("\nInterrupted by user. Exiting.") sys.exit(1) def prompt_if_missing(args): - """ - Prompt for ephemeral fields like base_date if missing, - and also fill in other fields if user didn't supply them. - """ + # Same prompts as config, plus base_date + prompt_for_config(args) try: - if not args.author: - args.author = input("Photographer's name (Author)? ").strip() - - if args.lab is None: - resp = input("Lab name (optional, enter to skip)? ").strip() - args.lab = resp if resp else "" - - if args.make is None: - resp = input("Camera make (optional, enter to skip)? ").strip() - args.make = resp if resp else "" - - if args.model is None: - resp = input("Camera model (optional, enter to skip)? ").strip() - args.model = resp if resp else "" - - if args.film is None: - resp = input("Film stock (optional, enter to skip)? ").strip() - args.film = resp if resp else "" - if not args.base_date: dflt = datetime.datetime.now().strftime("%Y-%m-%d") resp = input(f"Base date/time for first image [{dflt}]: ").strip() args.base_date = resp if resp else dflt - - if not args.time_increment: - dflt = "60" - resp = input(f"Time increment in seconds [{dflt}]: ").strip() - args.time_increment = int(resp) if resp else int(dflt) - except KeyboardInterrupt: print("\nInterrupted by user. Exiting.") sys.exit(1) @@ -149,60 +111,63 @@ def parse_user_date(dt_str): return datetime.datetime.strptime(dt_str, "%Y-%m-%d") -def build_exiftool_cmd(file_path, author, lab, make, model, film, timestamp, dry_run=False): +def build_exiftool_cmd(file_path, author, lab, make, model, film, timestamp): """ - Use standard EXIF fields: - - EXIF:Make (args.make) - - EXIF:Model (args.model) - - EXIF:UserComment (args.film) - Also store film in XMP:Description for better compatibility. + Builds the command list. Does NOT handle dry_run formatting. + Always returns a list for subprocess safety. """ current_year = datetime.datetime.now().year cmd = [ "exiftool", "-overwrite_original", - - # Photographer info f"-Artist={author}", f"-Creator={author}", f"-By-line={author}", f"-Credit={author}", f"-CopyrightNotice=© {current_year} {author}", f"-Copyright=© {current_year} {author}", - - # Timestamps f"-DateTimeOriginal={timestamp}", f"-CreateDate={timestamp}", - - # Clear out some lab fields "-WebStatement=", "-CreatorWorkURL=" ] - # Lab in XMP:DevelopedBy if lab: cmd.append(f"-XMP:DevelopedBy={lab}") - - # If user gave a make, store it in EXIF:Make if make: cmd.append(f"-Make={make}") - - # If user gave a model, store it in EXIF:Model if model: cmd.append(f"-Model={model}") - - # If user gave a film stock, store it in EXIF:UserComment AND XMP:Description if film: cmd.append(f"-UserComment={film}") cmd.append(f"-XMP:Description={film}") cmd.append(file_path) - - if dry_run: - return " ".join(cmd) return cmd +def run_exiftool(cmd, dry_run=False): + """ + Unified execution logic. + Returns: (success_bool, message_str) + """ + if dry_run: + # Return the safe shell command string + safe_cmd = shlex.join(cmd) + return True, f"[DRY RUN] {safe_cmd}" + + try: + subprocess.run( + cmd, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + return True, "Updated" + except subprocess.CalledProcessError as e: + return False, f"Error: {e}" + + def create_config_file(args): if os.path.exists(CONFIG_PATH): print("Config file already exists. Not overwriting.") @@ -217,20 +182,12 @@ def create_config_file(args): "time_increment": args.time_increment if args.time_increment else 60 } - # Remove empty values so user is prompted next time if they left something blank - keys_to_remove = [] - for k, v in defaults.items(): - if isinstance(v, str) and not v.strip(): - keys_to_remove.append(k) - - for k in keys_to_remove: - del defaults[k] + # Filter empty + defaults = {k: v for k, v in defaults.items() if not (isinstance(v, str) and not v.strip())} os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True) - with open(CONFIG_PATH, "w", encoding="utf-8") as f: toml.dump(defaults, f) - print(f"Created config file at {CONFIG_PATH}") sys.exit(0) @@ -239,10 +196,18 @@ def main(): try: config = load_config() args = parse_args(config) - # Default to number of CPUs if workers not specified + + # Defaults if args.workers is None: args.workers = os.cpu_count() or 1 + # LOGIC SIMPLIFICATION 1: + # Instead of writing a separate sequential loop for dry-runs, + # we just force workers=1 here. The executor handles the rest. + if args.dry_run: + print("Dry run detected: Forcing sequential processing.") + args.workers = 1 + if args.init_config: prompt_for_config(args) create_config_file(args) @@ -262,94 +227,66 @@ def main(): files = sorted(args.files) total_files = len(files) time_increment = args.time_increment if args.time_increment else 60 - current_dt = base_dt print(f"Processing {total_files} file(s)...") with alive_bar(total_files, title="Tagging files") as bar: - if args.workers > 1 and not args.dry_run: - executor = ThreadPoolExecutor(max_workers=args.workers) - futures = {} - supported_idx = 0 - for f in files: - ext = os.path.splitext(f)[1].lower() - if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']: - bar.text(f"Skipping unsupported file: {f}") - bar() - continue - ts_dt = base_dt + datetime.timedelta(seconds=supported_idx * time_increment) - timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S") - cmd = build_exiftool_cmd( - file_path=f, - author=args.author, - lab=args.lab, - make=args.make, - model=args.model, - film=args.film, - timestamp=timestamp_str, - dry_run=False - ) - future = executor.submit( - subprocess.run, cmd, check=True, - stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL - ) - futures[future] = (f, timestamp_str) - supported_idx += 1 + # 1. Prepare all tasks first + tasks = [] + for i, f in enumerate(files): + ext = os.path.splitext(f)[1].lower() + if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']: + continue + + ts_dt = base_dt + datetime.timedelta(seconds=i * time_increment) + timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S") + + if args.sidecar: + target_file_path = f"{f}.xmp" + else: + target_file_path = f + + cmd = build_exiftool_cmd( + file_path=target_file_path, + author=args.author, + lab=args.lab, + make=args.make, + model=args.model, + film=args.film, + timestamp=timestamp_str + ) + + tasks.append((cmd, f, timestamp_str)) + + with ThreadPoolExecutor(max_workers=args.workers) as executor: + # Submit all tasks + # Note: We pass args.dry_run into the function here + futures = { + executor.submit(run_exiftool, cmd, args.dry_run): (f, ts) + for cmd, f, ts in tasks + } + + # Process results as they complete for future in as_completed(futures): - f, ts = futures[future] - try: - future.result() - bar.text(f"Updated {f} => {ts}") - except subprocess.CalledProcessError as e: - bar.text(f"Failed to update {f}: {e}") - bar() - - executor.shutdown(wait=True) - else: - supported_idx = 0 - current_dt = base_dt - for f in files: - ext = os.path.splitext(f.lower())[1] - if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']: - bar.text(f"Skipping unsupported file: {f}") - bar() - continue - - timestamp_str = current_dt.strftime("%Y:%m:%d %H:%M:%S") - cmd = build_exiftool_cmd( - file_path=f, - author=args.author, - lab=args.lab, - make=args.make, - model=args.model, - film=args.film, - timestamp=timestamp_str, - dry_run=args.dry_run - ) + original_file, ts = futures[future] + success, msg = future.result() if args.dry_run: - bar.text(f"DRY RUN: {cmd}") + # For dry run, we PRINT the command so the user can copy it + # bar.text() is transient, print() persists in terminal + print(msg) + elif not success: + bar.text(f"Failed {original_file}: {msg}") else: - try: - subprocess.run( - cmd, - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL - ) - bar.text(f"Updated {f} => {timestamp_str}") - except subprocess.CalledProcessError as e: - bar.text(f"Failed to update {f}: {e}") + bar.text(f"Updated {original_file} => {ts}") - current_dt += datetime.timedelta(seconds=time_increment) bar() except KeyboardInterrupt: print("\nInterrupted by user. Exiting.") sys.exit(1) - if __name__ == '__main__': main()