diff --git a/src/emulsion/config.py b/src/emulsion/config.py new file mode 100644 index 0000000..8cb8339 --- /dev/null +++ b/src/emulsion/config.py @@ -0,0 +1,112 @@ +import copy +import os + +import toml + +# TODO: proper path loading +CONFIG_PATH = os.path.expanduser("~/.config/emulsion/config.toml") + +# Default Schema +# This defines the "First Class" feel of the app, but is fully overridable. +DEFAULT_CONFIG = { + "sidecar": { + "extension": ".xmp" + }, + "defaults": { + "time_increment": 60 + }, + "mappings": { + "author": { + "flags": [ + "-Artist={value}", + "-Creator={value}", + "-By-line={value}", + "-Credit={value}", + "-CopyrightNotice=© {year} {value}", + "-Copyright=© {year} {value}" + ], + "prompt": True, + "help": "Name of the photographer" + }, + "lab": { + "flags": ["-XMP:DevelopedBy={value}"], + "prompt": True, + "help": "Lab name" + }, + "make": { + "flags": ["-Make={value}"], + "prompt": True, + "help": "Camera make" + }, + "model": { + "flags": ["-Model={value}"], + "prompt": True, + "help": "Camera model" + }, + "lens": { + "flags": ["-LensModel={value}", "-Lens={value}"], + "prompt": True, + "help": "Lens model" + }, + "film": { + "flags": ["-UserComment={value}", "-XMP:Description={value}"], + "prompt": False, + "help": "Film stock" + } + } +} + +class ConfigLoader: + def __init__(self, path=CONFIG_PATH): + self.path = path + self.config = copy.deepcopy(DEFAULT_CONFIG) + + def load(self): + """ + Loads the config from disk and merges it into the defaults. + Returns the full config dictionary. + """ + if os.path.isfile(self.path): + try: + user_config = toml.load(self.path) + self._merge(self.config, user_config) + except Exception as e: + # We might want to let the caller handle this, or just print warning + print(f"Warning: Could not parse config file at {self.path}: {e}") + + return self.config + + def _merge(self, base, update): + """ + Recursively merges 'update' dict into 'base' dict. + """ + # TODO: this is fine, but i do wonder if there is a way to do this with + # a set operation rather than a for-loop. perhaps not in a way that + # maintains the checks... + for key, value in update.items(): + if isinstance(value, dict) and key in base and isinstance(base[key], dict): + self._merge(base[key], value) + else: + base[key] = value + + def save_defaults(self, current_defaults): + """ + Helpers to write a new config file (for --init-config). This is a bit + tricky because we don't want to just dump the massive DEFAULT_CONFIG. We + likely want to write a file that reflects what the user *currently* has + + defaults. + """ + # For now, simplistic implementation: Dump the merged state. + # In a real app, we might want to preserve comments etc, but TOML lib + # doesn't do that. + + # Ensure directory exists + os.makedirs(os.path.dirname(self.path), exist_ok=True) + + # We probably only want to write if it doesn't exist, to avoid clobbering. + if os.path.exists(self.path): + return False + + with open(self.path, "w", encoding="utf-8") as f: + toml.dump(current_defaults, f) + return True diff --git a/src/emulsion/executor.py b/src/emulsion/executor.py new file mode 100644 index 0000000..3af52b2 --- /dev/null +++ b/src/emulsion/executor.py @@ -0,0 +1,188 @@ +import datetime +import os +import shlex +import subprocess +from concurrent.futures import ThreadPoolExecutor, as_completed + +from alive_progress import alive_bar + + +class Executor: + def __init__(self, config): + self.config = config + self.mappings = config.get('mappings', {}) + self.sidecar_ext = config.get('sidecar', {}).get('extension', '.xmp') + if not self.sidecar_ext.startswith('.'): + self.sidecar_ext = f".{self.sidecar_ext}" + + def run_batch(self, files, resolved_values, options): + """ + Main execution entry point. + files: List of file paths + resolved_values: Dictionary of final field values + options: Dictionary/Namespace of process options (dry_run, workers, + base_date, etc.) + """ + extensions = ['.jpg', '.jpeg', '.tif', '.tiff'] + # Filter supported files + valid_files = [ + f for f in files + if os.path.splitext(f)[1].lower() in extensions + ] + + if not valid_files: + print("No valid image files found to process.") + return + + # Sort for consistent time incrementing + valid_files.sort() + total_files = len(valid_files) + + # Parse base date + try: + base_dt = self._parse_date(options.base_date) + except ValueError: + print( + f"Error: Base date '{options.base_date}' must be " + "'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'." + ) + return + + time_increment = options.time_increment or 60 + workers = options.workers or 1 + dry_run = options.dry_run + + print(f"Processing {total_files} file(s)...") + + # Prepare tasks + tasks = [] + for i, f in enumerate(valid_files): + # Calculate timestamp + ts_dt = base_dt + datetime.timedelta(seconds=i * time_increment) + timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S") + + # Determine file targets (Sidecar logic) + target_path, sidecar_source = self._determine_paths( + f, options.embed + ) + + # Build Command + cmd = self._build_cmd( + target_path, + resolved_values, + timestamp_str, + sidecar_source + ) + tasks.append((cmd, f, timestamp_str)) + + # Execute + with ( + alive_bar(total_files, title="Tagging files") as bar, + ThreadPoolExecutor(max_workers=workers) as executor + ): + futures = { + executor.submit(self._run_exiftool, cmd, dry_run): (f, ts) + for cmd, f, ts in tasks + } + + for future in as_completed(futures): + original_file, ts = futures[future] + success, msg = future.result() + + if dry_run: + print(msg) + elif not success: + bar.text(f"Failed {original_file}: {msg}") + else: + bar.text(f"Updated {original_file} => {ts}") + + bar() + + def _determine_paths(self, original_file, embed): + """ + Returns (target_path, sidecar_source_if_needed) + """ + if embed: + return original_file, None + + target_path = f"{original_file}{self.sidecar_ext}" + + # If sidecar doesn't exist, we need to tell ExifTool to read from source + # and write to the new sidecar file. + if not os.path.exists(target_path): + return target_path, original_file + + return target_path, None + + def _build_cmd( + self, file_path, field_values, timestamp_str, sidecar_source=None + ): + current_year = datetime.datetime.now().year + + # Core setup + cmd = [ + "exiftool", + "-overwrite_original", + f"-DateTimeOriginal={timestamp_str}", + f"-CreateDate={timestamp_str}", + "-WebStatement=", + "-CreatorWorkURL=" + ] + + # Add mapped fields + for field_name, val in field_values.items(): + if field_name in self.mappings: + schema = self.mappings[field_name] + # Schema can be dict (new style) or list (old style/simple) + flags = ( + schema.get('flags', []) + if isinstance(schema, dict) + else schema + ) + + # Ensure flags is a list (just in case) + if isinstance(flags, list): + for flag in flags: + safe_flag = flag.replace( + '{value}', + str(val)).replace('{year}', str(current_year) + ) + cmd.append(safe_flag) + + # Sidecar handling + if sidecar_source: + # -srcfile SOURCE TARGET + cmd.append("-srcfile") + cmd.append(file_path) + cmd.append(sidecar_source) + else: + cmd.append(file_path) + + return cmd + + def _run_exiftool(self, cmd, dry_run): + if dry_run: + safe_cmd = shlex.join(cmd) + return True, f"[DRY RUN] {safe_cmd}" + + try: + subprocess.run( # noqa: S603 + cmd, + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + return True, "Updated" + except subprocess.CalledProcessError as e: + return False, f"Error: {e}" + except FileNotFoundError: + return False, "Error: 'exiftool' not found. Please install it." + + def _parse_date(self, dt_str): + if not dt_str: + # Should be handled by resolver/validator, but safe fallback + return datetime.datetime.now() + dt_str = dt_str.strip() + if " " in dt_str: + return datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") + return datetime.datetime.strptime(dt_str, "%Y-%m-%d") diff --git a/src/emulsion/main.py b/src/emulsion/main.py index 852a921..7409d97 100644 --- a/src/emulsion/main.py +++ b/src/emulsion/main.py @@ -1,15 +1,12 @@ import argparse +import datetime import os import sys -import subprocess -import datetime -import toml -import shlex # Used for properly quoting shell commands -from alive_progress import alive_bar -from concurrent.futures import ThreadPoolExecutor, as_completed -from importlib.metadata import version, PackageNotFoundError +from importlib.metadata import PackageNotFoundError, version -CONFIG_PATH = os.path.expanduser("~/.config/emulsion/config.toml") +from emulsion.config import ConfigLoader +from emulsion.executor import Executor +from emulsion.resolver import ValueResolver def get_version(): @@ -19,296 +16,217 @@ def get_version(): return 'unknown' -def load_config(): - if os.path.isfile(CONFIG_PATH): - try: - return toml.load(CONFIG_PATH) - except Exception as e: - print(f"Warning: Could not parse config file: {e}") - return {} - - -def parse_args(config): - parser = argparse.ArgumentParser(description='A tool for updating exif tags') - - parser.add_argument( - 'files', - nargs='*', - help='Image files to process (e.g. *.jpg *.tif).' +def parse_args(): + parser = argparse.ArgumentParser( + description='A tool for updating exif tags' ) parser.add_argument( - '-v', '--version', - action='version', - version=f'%(prog)s {get_version()}' + 'files', nargs='*', help='Image files to process (e.g. *.jpg *.tif).' ) - # Configurable fields - parser.add_argument('--author', default=None, help='Name of the photographer.') - parser.add_argument('--lab', default=None, help='Name of the lab who developed the film.') - parser.add_argument('--make', default=None, help='Camera make (stored in EXIF:Make).') - parser.add_argument('--model', default=None, help='Camera model (stored in EXIF:Model).') - parser.add_argument('--film', default=None, help='Film stock (stored in EXIF:UserComment and XMP:Description).') - - # Time settings - parser.add_argument('--base-date', default=None, help='Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).') - parser.add_argument('--time-increment', type=int, default=None, help='Time increment in seconds between images.') - parser.add_argument('--embed', action='store_true', help='Embed EXIF data directly into the image file instead of a sidecar.') - parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without modifying files.') parser.add_argument( - '-j', '--workers', + '-v', '--version', action='version', version=f'%(prog)s {get_version()}' + ) + + # --- First-Class Fields --- + parser.add_argument('--author', help='Name of the photographer.') + parser.add_argument('--lab', help='Name of the lab who developed the film.') + parser.add_argument('--make', help='Camera make.') + parser.add_argument('--model', help='Camera model.') + parser.add_argument('--lens', help='Lens model.') + parser.add_argument('--film', help='Film stock.') + + # --- Generic/Custom Fields --- + parser.add_argument( + '--field', + action='append', + dest='custom_fields', + metavar='KEY=VALUE', + help=( + 'Set a custom field defined in config (e.g., ' + '--field location="Paris").' + ), + ) + + # --- Process Control --- + parser.add_argument( + '--base-date', + default=None, + help=( + 'Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).' + ) + ) + + parser.add_argument( + '--time-increment', + type=int, + default=None, + help='Time increment in seconds between images.', + ) + + parser.add_argument( + '--embed', + action='store_true', + help=( + 'Embed EXIF data directly into the image file instead of a ' + 'sidecar.' + ) + ) + + parser.add_argument( + '--dry-run', + action='store_true', + help='Show what would be changed without modifying files.', + ) + + parser.add_argument( + '-j', + '--workers', type=int, nargs='?', const=os.cpu_count() or 1, default=os.cpu_count() or 1, - help='Number of parallel workers to run exiftool; defaults to number of CPUs.' - ) - parser.add_argument('--init-config', action='store_true', help='Create a default config file (if none exists) and exit.') - - args = parser.parse_args() - - # Merge from config - keys = ['author', 'lab', 'make', 'model', 'film', 'time_increment'] - for k in keys: - if getattr(args, k) is None and k in config: - setattr(args, k, config[k]) - - return args - - -def prompt_for_config(args): - try: - if not args.author: - args.author = input("Photographer's name (Author)? ").strip() - if args.lab is None: - args.lab = input("Lab name (optional, enter to skip)? ").strip() - if args.make is None: - args.make = input("Camera make (optional, enter to skip)? ").strip() - if args.model is None: - args.model = input("Camera model (optional, enter to skip)? ").strip() - if args.film is None: - args.film = input("Film stock (optional, enter to skip)? ").strip() - if not args.time_increment: - dflt = "60" - resp = input(f"Time increment in seconds [{dflt}]: ").strip() - args.time_increment = int(resp) if resp else int(dflt) - except KeyboardInterrupt: - print("\nInterrupted by user. Exiting.") - sys.exit(1) - - -def prompt_if_missing(args): - # Same prompts as config, plus base_date - prompt_for_config(args) - try: - while not args.base_date: - dflt = datetime.datetime.now().strftime("%Y-%m-%d") - resp = input(f"Base date/time for first image [{dflt}]: ").strip() - args.base_date = resp if resp else dflt - # Validate immediately so we don't crash later - try: - parse_user_date(args.base_date) - except ValueError: - print("Invalid format. Please use 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.") - args.base_date = None - except KeyboardInterrupt: - print("\nInterrupted by user. Exiting.") - sys.exit(1) - - -def parse_user_date(dt_str): - dt_str = dt_str.strip() - if " " in dt_str: - return datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S") - else: - return datetime.datetime.strptime(dt_str, "%Y-%m-%d") - - -def build_exiftool_cmd(file_path, author, lab, make, model, film, timestamp, sidecar_source=None): - """ - Builds the command list. - sidecar_source: If set, we are creating/updating a sidecar FROM this source image. - """ - current_year = datetime.datetime.now().year - cmd = [ - "exiftool", - "-overwrite_original", - f"-Artist={author}", - f"-Creator={author}", - f"-By-line={author}", - f"-Credit={author}", - f"-CopyrightNotice=© {current_year} {author}", - f"-Copyright=© {current_year} {author}", - f"-DateTimeOriginal={timestamp}", - f"-CreateDate={timestamp}", - "-WebStatement=", - "-CreatorWorkURL=" - ] - - if lab: - cmd.append(f"-XMP:DevelopedBy={lab}") - if make: - cmd.append(f"-Make={make}") - if model: - cmd.append(f"-Model={model}") - if film: - cmd.append(f"-UserComment={film}") - cmd.append(f"-XMP:Description={film}") - - if sidecar_source: - # Advanced ExifTool usage: read source, write to specific sidecar file - # This ensures it works even if the sidecar doesn't exist yet - cmd.append(f"-srcfile") - cmd.append(file_path) - cmd.append(sidecar_source) - else: - cmd.append(file_path) - - return cmd - - -def run_exiftool(cmd, dry_run=False): - """ - Unified execution logic. - Returns: (success_bool, message_str) - """ - if dry_run: - # Return the safe shell command string - safe_cmd = shlex.join(cmd) - return True, f"[DRY RUN] {safe_cmd}" - - try: - subprocess.run( - cmd, - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + help=( + 'Number of parallel workers to run exiftool; defaults to number ' + 'of CPUs.' ) - return True, "Updated" - except subprocess.CalledProcessError as e: - return False, f"Error: {e}" - except FileNotFoundError: - return False, "Error: 'exiftool' not found. Please install it." + ) + + parser.add_argument( + '--init-config', + action='store_true', + help='Create a default config file (if none exists) and exit.', + ) + + parser.add_argument( + '--no-interaction', + action='store_true', + help='Do not prompt for missing fields (skip them if missing).', + ) + + return parser.parse_args() -def create_config_file(args): - if os.path.exists(CONFIG_PATH): - print("Config file already exists. Not overwriting.") - sys.exit(0) +def prompt_for_defaults(config): + """ + Prompts the user for default values to populate the initial config. + """ + print("Initializing configuration. Press Enter to skip any field.") + + # We'll iterate over the 'mappings' to find what fields are available, + # but we'll prioritize the 'core' ones for a better UX order. + core_fields = ['author', 'lab', 'make', 'model', 'lens', 'film'] + mappings = config.get('mappings', {}) + defaults = config.setdefault('defaults', {}) - defaults = { - "author": args.author or "Your Name", - "lab": args.lab or "", - "make": args.make or "", - "model": args.model or "", - "film": args.film or "", - "time_increment": args.time_increment if args.time_increment else 60 - } - - # Filter empty - defaults = {k: v for k, v in defaults.items() if not (isinstance(v, str) and not v.strip())} - - os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True) - with open(CONFIG_PATH, "w", encoding="utf-8") as f: - toml.dump(defaults, f) - print(f"Created config file at {CONFIG_PATH}") - sys.exit(0) + # Prompt for core fields first + for field in core_fields: + if field in mappings: + schema = mappings[field] + help_text = schema.get('help', field) if isinstance(schema, dict) else field + val = input(f"Default {help_text} (optional): ").strip() + if val: + defaults[field] = val + + # Time increment + dflt_inc = defaults.get('time_increment', 60) + val = input(f"Default Time Increment [seconds] ({dflt_inc}): ").strip() + if val: + try: + defaults['time_increment'] = int(val) + except ValueError: + print("Invalid number, keeping default.") def main(): try: - config = load_config() - args = parse_args(config) + # 1. Load Config + loader = ConfigLoader() + config = loader.load() - # Defaults - if args.workers is None: - args.workers = os.cpu_count() or 1 - - if args.dry_run: - print("Dry run detected: Forcing sequential processing.") - args.workers = 1 + # 2. Parse CLI + args = parse_args() + # Handle Initialization if args.init_config: - prompt_for_config(args) - create_config_file(args) + if os.path.exists(loader.path): + print(f"Config file already exists at {loader.path}. Not overwriting.") + sys.exit(0) + + # Prompt user for initial values + try: + prompt_for_defaults(config) + except KeyboardInterrupt: + print("\nAborted.") + sys.exit(1) - if not args.files: - print("No files provided.") + if loader.save_defaults(config): + print(f'Created config file at {loader.path}') + else: + # Should be caught by the check above, but for safety + print('Config file already exists. Not overwriting.') sys.exit(0) - prompt_if_missing(args) + if not args.files: + print('No files provided.') + sys.exit(0) - try: - base_dt = parse_user_date(args.base_date) - except ValueError: - print(f"Error: Base date '{args.base_date}' must be 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.") + # Handle Base Date Prompt logic + if not args.base_date and not args.no_interaction: + dflt = datetime.datetime.now().strftime('%Y-%m-%d') + resp = input(f'Base date/time for first image [{dflt}]: ').strip() + args.base_date = resp if resp else dflt + + if not args.base_date: + print('Error: Base date is required.') sys.exit(1) - files = sorted(args.files) - total_files = len(files) - time_increment = args.time_increment if args.time_increment else 60 + # 3. Prepare Inputs for Resolver + # We need to mash --author and --field author=... into one dict + user_inputs = {} - print(f"Processing {total_files} file(s)...") + # First-Class args + for field in ['author', 'lab', 'make', 'model', 'lens', 'film']: + val = getattr(args, field, None) + if val: + user_inputs[field] = val - with alive_bar(total_files, title="Tagging files") as bar: - - # 1. Prepare all tasks first - tasks = [] - - for i, f in enumerate(files): - ext = os.path.splitext(f)[1].lower() - if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']: - continue - - ts_dt = base_dt + datetime.timedelta(seconds=i * time_increment) - timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S") - - sidecar_source = None - if not args.embed: - target_file_path = f"{f}.xmp" - # If sidecar doesn't exist, we must tell ExifTool to create it from the source image - if not os.path.exists(target_file_path): - sidecar_source = f + # Custom args + if args.custom_fields: + for item in args.custom_fields: + if '=' in item: + key, val = item.split('=', 1) + user_inputs[key.strip()] = val.strip() else: - target_file_path = f + print( + f"Warning: Invalid format for --field '{item}'. " + "Expected KEY=VALUE." + ) - cmd = build_exiftool_cmd( - file_path=target_file_path, - author=args.author, - lab=args.lab, - make=args.make, - model=args.model, - film=args.film, - timestamp=timestamp_str, - sidecar_source=sidecar_source - ) + # 4. Resolve Metadata + resolver = ValueResolver(config) + resolved_values = resolver.resolve( + user_inputs, interactive=not args.no_interaction + ) - tasks.append((cmd, f, timestamp_str)) + # 5. Execute + executor = Executor(config) - with ThreadPoolExecutor(max_workers=args.workers) as executor: - # Submit all tasks - futures = { - executor.submit(run_exiftool, cmd, args.dry_run): (f, ts) - for cmd, f, ts in tasks - } + # We pass 'args' as the options object (has dry_run, workers, etc) + # Just need to make sure time_increment is resolved from config defaults + # if missing + if args.time_increment is None: + args.time_increment = config.get('defaults', {}).get( + 'time_increment', 60 + ) - # Process results as they complete - for future in as_completed(futures): - original_file, ts = futures[future] - success, msg = future.result() - - if args.dry_run: - print(msg) - elif not success: - bar.text(f"Failed {original_file}: {msg}") - else: - bar.text(f"Updated {original_file} => {ts}") - - bar() + executor.run_batch(args.files, resolved_values, args) except KeyboardInterrupt: - print("\nInterrupted by user. Exiting.") + print('\nInterrupted by user. Exiting.') sys.exit(1) + if __name__ == '__main__': - main() + main() \ No newline at end of file diff --git a/src/emulsion/resolver.py b/src/emulsion/resolver.py new file mode 100644 index 0000000..9bc3c22 --- /dev/null +++ b/src/emulsion/resolver.py @@ -0,0 +1,86 @@ +import sys + + +class ValueResolver: + def __init__(self, config): + """ + config: The loaded configuration dictionary containing 'mappings' and + 'defaults'. + """ + self.mappings = config.get('mappings', {}) + self.defaults = config.get('defaults', {}) + + def resolve(self, cli_args, interactive=True): + """ + Resolves the final values for all fields. + + Strategy: + 1. Start with Config Defaults. + 2. Overlay CLI Arguments. + 3. Identify fields that require prompting (prompt=True in config). + 4. If interactive, prompt for missing required fields. + 5. Return final dictionary of {field: value}. + """ + # 1. Start with Defaults + # We filter defaults to only include things that might be fields (or + # core settings) + # Actually, 'defaults' in config might mix settings (time_increment) and + # fields (author). + # The executor will ignore keys it doesn't understand, so it's safe to + # pass all. + resolved = self.defaults.copy() + + # 2. Overlay CLI Inputs + # cli_args is expected to be a dict of {key: value} provided by the user. + # This merges both --author and --field author=... + for key, val in cli_args.items(): + if val is not None: + resolved[key] = val + + # 3. Identify Prompts + # We look at the 'mappings' to see which fields want to be prompted. + # TODO: also here i wonder if there is a cleaner approach with a filter + # or something + if interactive: + fields_to_prompt = [] + for field_name, schema in self.mappings.items(): + # Check if prompt is requested + if (isinstance(schema, dict) + and schema.get('prompt', False) + # Check if we already have a value + and (field_name not in resolved or not resolved[field_name]) + ): + fields_to_prompt.append(field_name) + + # Sort for stability (or maybe define priority in config later?) + fields_to_prompt.sort() + + # 4. Prompt Loop + try: + for field in fields_to_prompt: + self._prompt_user(field, resolved) + except KeyboardInterrupt: + print("\nInterrupted. Exiting.") + sys.exit(1) + + # Remove any fields that are still None/Empty (optional, but cleaner) + return {k: v for k, v in resolved.items() if v} + + def _prompt_user(self, field_name, resolved_dict): + """ + Helper to prompt a single field. + """ + schema = self.mappings.get(field_name, {}) + help_text = ( + schema.get('help', field_name) + if isinstance(schema, dict) + else field_name + ) + + # We capitalize the field name for the prompt label if help text matches + # name + label = help_text + + val = input(f"{label} (Optional): ").strip() + if val: + resolved_dict[field_name] = val