Initial commit of restructure

This commit is contained in:
Alexander Wainwright
2025-12-27 11:58:03 +10:00
parent 37fbce61c9
commit 91cc408d34
4 changed files with 566 additions and 262 deletions

112
src/emulsion/config.py Normal file
View File

@@ -0,0 +1,112 @@
import copy
import os
import toml
# TODO: proper path loading
CONFIG_PATH = os.path.expanduser("~/.config/emulsion/config.toml")
# Default Schema
# This defines the "First Class" feel of the app, but is fully overridable.
DEFAULT_CONFIG = {
"sidecar": {
"extension": ".xmp"
},
"defaults": {
"time_increment": 60
},
"mappings": {
"author": {
"flags": [
"-Artist={value}",
"-Creator={value}",
"-By-line={value}",
"-Credit={value}",
"-CopyrightNotice=© {year} {value}",
"-Copyright=© {year} {value}"
],
"prompt": True,
"help": "Name of the photographer"
},
"lab": {
"flags": ["-XMP:DevelopedBy={value}"],
"prompt": True,
"help": "Lab name"
},
"make": {
"flags": ["-Make={value}"],
"prompt": True,
"help": "Camera make"
},
"model": {
"flags": ["-Model={value}"],
"prompt": True,
"help": "Camera model"
},
"lens": {
"flags": ["-LensModel={value}", "-Lens={value}"],
"prompt": True,
"help": "Lens model"
},
"film": {
"flags": ["-UserComment={value}", "-XMP:Description={value}"],
"prompt": False,
"help": "Film stock"
}
}
}
class ConfigLoader:
def __init__(self, path=CONFIG_PATH):
self.path = path
self.config = copy.deepcopy(DEFAULT_CONFIG)
def load(self):
"""
Loads the config from disk and merges it into the defaults.
Returns the full config dictionary.
"""
if os.path.isfile(self.path):
try:
user_config = toml.load(self.path)
self._merge(self.config, user_config)
except Exception as e:
# We might want to let the caller handle this, or just print warning
print(f"Warning: Could not parse config file at {self.path}: {e}")
return self.config
def _merge(self, base, update):
"""
Recursively merges 'update' dict into 'base' dict.
"""
# TODO: this is fine, but i do wonder if there is a way to do this with
# a set operation rather than a for-loop. perhaps not in a way that
# maintains the checks...
for key, value in update.items():
if isinstance(value, dict) and key in base and isinstance(base[key], dict):
self._merge(base[key], value)
else:
base[key] = value
def save_defaults(self, current_defaults):
"""
Helpers to write a new config file (for --init-config). This is a bit
tricky because we don't want to just dump the massive DEFAULT_CONFIG. We
likely want to write a file that reflects what the user *currently* has
+ defaults.
"""
# For now, simplistic implementation: Dump the merged state.
# In a real app, we might want to preserve comments etc, but TOML lib
# doesn't do that.
# Ensure directory exists
os.makedirs(os.path.dirname(self.path), exist_ok=True)
# We probably only want to write if it doesn't exist, to avoid clobbering.
if os.path.exists(self.path):
return False
with open(self.path, "w", encoding="utf-8") as f:
toml.dump(current_defaults, f)
return True

188
src/emulsion/executor.py Normal file
View File

@@ -0,0 +1,188 @@
import datetime
import os
import shlex
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from alive_progress import alive_bar
class Executor:
def __init__(self, config):
self.config = config
self.mappings = config.get('mappings', {})
self.sidecar_ext = config.get('sidecar', {}).get('extension', '.xmp')
if not self.sidecar_ext.startswith('.'):
self.sidecar_ext = f".{self.sidecar_ext}"
def run_batch(self, files, resolved_values, options):
"""
Main execution entry point.
files: List of file paths
resolved_values: Dictionary of final field values
options: Dictionary/Namespace of process options (dry_run, workers,
base_date, etc.)
"""
extensions = ['.jpg', '.jpeg', '.tif', '.tiff']
# Filter supported files
valid_files = [
f for f in files
if os.path.splitext(f)[1].lower() in extensions
]
if not valid_files:
print("No valid image files found to process.")
return
# Sort for consistent time incrementing
valid_files.sort()
total_files = len(valid_files)
# Parse base date
try:
base_dt = self._parse_date(options.base_date)
except ValueError:
print(
f"Error: Base date '{options.base_date}' must be "
"'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'."
)
return
time_increment = options.time_increment or 60
workers = options.workers or 1
dry_run = options.dry_run
print(f"Processing {total_files} file(s)...")
# Prepare tasks
tasks = []
for i, f in enumerate(valid_files):
# Calculate timestamp
ts_dt = base_dt + datetime.timedelta(seconds=i * time_increment)
timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S")
# Determine file targets (Sidecar logic)
target_path, sidecar_source = self._determine_paths(
f, options.embed
)
# Build Command
cmd = self._build_cmd(
target_path,
resolved_values,
timestamp_str,
sidecar_source
)
tasks.append((cmd, f, timestamp_str))
# Execute
with (
alive_bar(total_files, title="Tagging files") as bar,
ThreadPoolExecutor(max_workers=workers) as executor
):
futures = {
executor.submit(self._run_exiftool, cmd, dry_run): (f, ts)
for cmd, f, ts in tasks
}
for future in as_completed(futures):
original_file, ts = futures[future]
success, msg = future.result()
if dry_run:
print(msg)
elif not success:
bar.text(f"Failed {original_file}: {msg}")
else:
bar.text(f"Updated {original_file} => {ts}")
bar()
def _determine_paths(self, original_file, embed):
"""
Returns (target_path, sidecar_source_if_needed)
"""
if embed:
return original_file, None
target_path = f"{original_file}{self.sidecar_ext}"
# If sidecar doesn't exist, we need to tell ExifTool to read from source
# and write to the new sidecar file.
if not os.path.exists(target_path):
return target_path, original_file
return target_path, None
def _build_cmd(
self, file_path, field_values, timestamp_str, sidecar_source=None
):
current_year = datetime.datetime.now().year
# Core setup
cmd = [
"exiftool",
"-overwrite_original",
f"-DateTimeOriginal={timestamp_str}",
f"-CreateDate={timestamp_str}",
"-WebStatement=",
"-CreatorWorkURL="
]
# Add mapped fields
for field_name, val in field_values.items():
if field_name in self.mappings:
schema = self.mappings[field_name]
# Schema can be dict (new style) or list (old style/simple)
flags = (
schema.get('flags', [])
if isinstance(schema, dict)
else schema
)
# Ensure flags is a list (just in case)
if isinstance(flags, list):
for flag in flags:
safe_flag = flag.replace(
'{value}',
str(val)).replace('{year}', str(current_year)
)
cmd.append(safe_flag)
# Sidecar handling
if sidecar_source:
# -srcfile SOURCE TARGET
cmd.append("-srcfile")
cmd.append(file_path)
cmd.append(sidecar_source)
else:
cmd.append(file_path)
return cmd
def _run_exiftool(self, cmd, dry_run):
if dry_run:
safe_cmd = shlex.join(cmd)
return True, f"[DRY RUN] {safe_cmd}"
try:
subprocess.run( # noqa: S603
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return True, "Updated"
except subprocess.CalledProcessError as e:
return False, f"Error: {e}"
except FileNotFoundError:
return False, "Error: 'exiftool' not found. Please install it."
def _parse_date(self, dt_str):
if not dt_str:
# Should be handled by resolver/validator, but safe fallback
return datetime.datetime.now()
dt_str = dt_str.strip()
if " " in dt_str:
return datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
return datetime.datetime.strptime(dt_str, "%Y-%m-%d")

View File

@@ -1,15 +1,12 @@
import argparse
import datetime
import os
import sys
import subprocess
import datetime
import toml
import shlex # Used for properly quoting shell commands
from alive_progress import alive_bar
from concurrent.futures import ThreadPoolExecutor, as_completed
from importlib.metadata import version, PackageNotFoundError
from importlib.metadata import PackageNotFoundError, version
CONFIG_PATH = os.path.expanduser("~/.config/emulsion/config.toml")
from emulsion.config import ConfigLoader
from emulsion.executor import Executor
from emulsion.resolver import ValueResolver
def get_version():
@@ -19,296 +16,217 @@ def get_version():
return 'unknown'
def load_config():
if os.path.isfile(CONFIG_PATH):
try:
return toml.load(CONFIG_PATH)
except Exception as e:
print(f"Warning: Could not parse config file: {e}")
return {}
def parse_args(config):
parser = argparse.ArgumentParser(description='A tool for updating exif tags')
parser.add_argument(
'files',
nargs='*',
help='Image files to process (e.g. *.jpg *.tif).'
def parse_args():
parser = argparse.ArgumentParser(
description='A tool for updating exif tags'
)
parser.add_argument(
'-v', '--version',
action='version',
version=f'%(prog)s {get_version()}'
'files', nargs='*', help='Image files to process (e.g. *.jpg *.tif).'
)
# Configurable fields
parser.add_argument('--author', default=None, help='Name of the photographer.')
parser.add_argument('--lab', default=None, help='Name of the lab who developed the film.')
parser.add_argument('--make', default=None, help='Camera make (stored in EXIF:Make).')
parser.add_argument('--model', default=None, help='Camera model (stored in EXIF:Model).')
parser.add_argument('--film', default=None, help='Film stock (stored in EXIF:UserComment and XMP:Description).')
# Time settings
parser.add_argument('--base-date', default=None, help='Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).')
parser.add_argument('--time-increment', type=int, default=None, help='Time increment in seconds between images.')
parser.add_argument('--embed', action='store_true', help='Embed EXIF data directly into the image file instead of a sidecar.')
parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without modifying files.')
parser.add_argument(
'-j', '--workers',
'-v', '--version', action='version', version=f'%(prog)s {get_version()}'
)
# --- First-Class Fields ---
parser.add_argument('--author', help='Name of the photographer.')
parser.add_argument('--lab', help='Name of the lab who developed the film.')
parser.add_argument('--make', help='Camera make.')
parser.add_argument('--model', help='Camera model.')
parser.add_argument('--lens', help='Lens model.')
parser.add_argument('--film', help='Film stock.')
# --- Generic/Custom Fields ---
parser.add_argument(
'--field',
action='append',
dest='custom_fields',
metavar='KEY=VALUE',
help=(
'Set a custom field defined in config (e.g., '
'--field location="Paris").'
),
)
# --- Process Control ---
parser.add_argument(
'--base-date',
default=None,
help=(
'Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).'
)
)
parser.add_argument(
'--time-increment',
type=int,
default=None,
help='Time increment in seconds between images.',
)
parser.add_argument(
'--embed',
action='store_true',
help=(
'Embed EXIF data directly into the image file instead of a '
'sidecar.'
)
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be changed without modifying files.',
)
parser.add_argument(
'-j',
'--workers',
type=int,
nargs='?',
const=os.cpu_count() or 1,
default=os.cpu_count() or 1,
help='Number of parallel workers to run exiftool; defaults to number of CPUs.'
)
parser.add_argument('--init-config', action='store_true', help='Create a default config file (if none exists) and exit.')
args = parser.parse_args()
# Merge from config
keys = ['author', 'lab', 'make', 'model', 'film', 'time_increment']
for k in keys:
if getattr(args, k) is None and k in config:
setattr(args, k, config[k])
return args
def prompt_for_config(args):
try:
if not args.author:
args.author = input("Photographer's name (Author)? ").strip()
if args.lab is None:
args.lab = input("Lab name (optional, enter to skip)? ").strip()
if args.make is None:
args.make = input("Camera make (optional, enter to skip)? ").strip()
if args.model is None:
args.model = input("Camera model (optional, enter to skip)? ").strip()
if args.film is None:
args.film = input("Film stock (optional, enter to skip)? ").strip()
if not args.time_increment:
dflt = "60"
resp = input(f"Time increment in seconds [{dflt}]: ").strip()
args.time_increment = int(resp) if resp else int(dflt)
except KeyboardInterrupt:
print("\nInterrupted by user. Exiting.")
sys.exit(1)
def prompt_if_missing(args):
# Same prompts as config, plus base_date
prompt_for_config(args)
try:
while not args.base_date:
dflt = datetime.datetime.now().strftime("%Y-%m-%d")
resp = input(f"Base date/time for first image [{dflt}]: ").strip()
args.base_date = resp if resp else dflt
# Validate immediately so we don't crash later
try:
parse_user_date(args.base_date)
except ValueError:
print("Invalid format. Please use 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.")
args.base_date = None
except KeyboardInterrupt:
print("\nInterrupted by user. Exiting.")
sys.exit(1)
def parse_user_date(dt_str):
dt_str = dt_str.strip()
if " " in dt_str:
return datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
else:
return datetime.datetime.strptime(dt_str, "%Y-%m-%d")
def build_exiftool_cmd(file_path, author, lab, make, model, film, timestamp, sidecar_source=None):
"""
Builds the command list.
sidecar_source: If set, we are creating/updating a sidecar FROM this source image.
"""
current_year = datetime.datetime.now().year
cmd = [
"exiftool",
"-overwrite_original",
f"-Artist={author}",
f"-Creator={author}",
f"-By-line={author}",
f"-Credit={author}",
f"-CopyrightNotice=© {current_year} {author}",
f"-Copyright=© {current_year} {author}",
f"-DateTimeOriginal={timestamp}",
f"-CreateDate={timestamp}",
"-WebStatement=",
"-CreatorWorkURL="
]
if lab:
cmd.append(f"-XMP:DevelopedBy={lab}")
if make:
cmd.append(f"-Make={make}")
if model:
cmd.append(f"-Model={model}")
if film:
cmd.append(f"-UserComment={film}")
cmd.append(f"-XMP:Description={film}")
if sidecar_source:
# Advanced ExifTool usage: read source, write to specific sidecar file
# This ensures it works even if the sidecar doesn't exist yet
cmd.append(f"-srcfile")
cmd.append(file_path)
cmd.append(sidecar_source)
else:
cmd.append(file_path)
return cmd
def run_exiftool(cmd, dry_run=False):
"""
Unified execution logic.
Returns: (success_bool, message_str)
"""
if dry_run:
# Return the safe shell command string
safe_cmd = shlex.join(cmd)
return True, f"[DRY RUN] {safe_cmd}"
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
help=(
'Number of parallel workers to run exiftool; defaults to number '
'of CPUs.'
)
return True, "Updated"
except subprocess.CalledProcessError as e:
return False, f"Error: {e}"
except FileNotFoundError:
return False, "Error: 'exiftool' not found. Please install it."
)
parser.add_argument(
'--init-config',
action='store_true',
help='Create a default config file (if none exists) and exit.',
)
parser.add_argument(
'--no-interaction',
action='store_true',
help='Do not prompt for missing fields (skip them if missing).',
)
return parser.parse_args()
def create_config_file(args):
if os.path.exists(CONFIG_PATH):
print("Config file already exists. Not overwriting.")
sys.exit(0)
def prompt_for_defaults(config):
"""
Prompts the user for default values to populate the initial config.
"""
print("Initializing configuration. Press Enter to skip any field.")
# We'll iterate over the 'mappings' to find what fields are available,
# but we'll prioritize the 'core' ones for a better UX order.
core_fields = ['author', 'lab', 'make', 'model', 'lens', 'film']
mappings = config.get('mappings', {})
defaults = config.setdefault('defaults', {})
defaults = {
"author": args.author or "Your Name",
"lab": args.lab or "",
"make": args.make or "",
"model": args.model or "",
"film": args.film or "",
"time_increment": args.time_increment if args.time_increment else 60
}
# Filter empty
defaults = {k: v for k, v in defaults.items() if not (isinstance(v, str) and not v.strip())}
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
toml.dump(defaults, f)
print(f"Created config file at {CONFIG_PATH}")
sys.exit(0)
# Prompt for core fields first
for field in core_fields:
if field in mappings:
schema = mappings[field]
help_text = schema.get('help', field) if isinstance(schema, dict) else field
val = input(f"Default {help_text} (optional): ").strip()
if val:
defaults[field] = val
# Time increment
dflt_inc = defaults.get('time_increment', 60)
val = input(f"Default Time Increment [seconds] ({dflt_inc}): ").strip()
if val:
try:
defaults['time_increment'] = int(val)
except ValueError:
print("Invalid number, keeping default.")
def main():
try:
config = load_config()
args = parse_args(config)
# 1. Load Config
loader = ConfigLoader()
config = loader.load()
# Defaults
if args.workers is None:
args.workers = os.cpu_count() or 1
if args.dry_run:
print("Dry run detected: Forcing sequential processing.")
args.workers = 1
# 2. Parse CLI
args = parse_args()
# Handle Initialization
if args.init_config:
prompt_for_config(args)
create_config_file(args)
if os.path.exists(loader.path):
print(f"Config file already exists at {loader.path}. Not overwriting.")
sys.exit(0)
# Prompt user for initial values
try:
prompt_for_defaults(config)
except KeyboardInterrupt:
print("\nAborted.")
sys.exit(1)
if not args.files:
print("No files provided.")
if loader.save_defaults(config):
print(f'Created config file at {loader.path}')
else:
# Should be caught by the check above, but for safety
print('Config file already exists. Not overwriting.')
sys.exit(0)
prompt_if_missing(args)
if not args.files:
print('No files provided.')
sys.exit(0)
try:
base_dt = parse_user_date(args.base_date)
except ValueError:
print(f"Error: Base date '{args.base_date}' must be 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.")
# Handle Base Date Prompt logic
if not args.base_date and not args.no_interaction:
dflt = datetime.datetime.now().strftime('%Y-%m-%d')
resp = input(f'Base date/time for first image [{dflt}]: ').strip()
args.base_date = resp if resp else dflt
if not args.base_date:
print('Error: Base date is required.')
sys.exit(1)
files = sorted(args.files)
total_files = len(files)
time_increment = args.time_increment if args.time_increment else 60
# 3. Prepare Inputs for Resolver
# We need to mash --author and --field author=... into one dict
user_inputs = {}
print(f"Processing {total_files} file(s)...")
# First-Class args
for field in ['author', 'lab', 'make', 'model', 'lens', 'film']:
val = getattr(args, field, None)
if val:
user_inputs[field] = val
with alive_bar(total_files, title="Tagging files") as bar:
# 1. Prepare all tasks first
tasks = []
for i, f in enumerate(files):
ext = os.path.splitext(f)[1].lower()
if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']:
continue
ts_dt = base_dt + datetime.timedelta(seconds=i * time_increment)
timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S")
sidecar_source = None
if not args.embed:
target_file_path = f"{f}.xmp"
# If sidecar doesn't exist, we must tell ExifTool to create it from the source image
if not os.path.exists(target_file_path):
sidecar_source = f
# Custom args
if args.custom_fields:
for item in args.custom_fields:
if '=' in item:
key, val = item.split('=', 1)
user_inputs[key.strip()] = val.strip()
else:
target_file_path = f
print(
f"Warning: Invalid format for --field '{item}'. "
"Expected KEY=VALUE."
)
cmd = build_exiftool_cmd(
file_path=target_file_path,
author=args.author,
lab=args.lab,
make=args.make,
model=args.model,
film=args.film,
timestamp=timestamp_str,
sidecar_source=sidecar_source
)
# 4. Resolve Metadata
resolver = ValueResolver(config)
resolved_values = resolver.resolve(
user_inputs, interactive=not args.no_interaction
)
tasks.append((cmd, f, timestamp_str))
# 5. Execute
executor = Executor(config)
with ThreadPoolExecutor(max_workers=args.workers) as executor:
# Submit all tasks
futures = {
executor.submit(run_exiftool, cmd, args.dry_run): (f, ts)
for cmd, f, ts in tasks
}
# We pass 'args' as the options object (has dry_run, workers, etc)
# Just need to make sure time_increment is resolved from config defaults
# if missing
if args.time_increment is None:
args.time_increment = config.get('defaults', {}).get(
'time_increment', 60
)
# Process results as they complete
for future in as_completed(futures):
original_file, ts = futures[future]
success, msg = future.result()
if args.dry_run:
print(msg)
elif not success:
bar.text(f"Failed {original_file}: {msg}")
else:
bar.text(f"Updated {original_file} => {ts}")
bar()
executor.run_batch(args.files, resolved_values, args)
except KeyboardInterrupt:
print("\nInterrupted by user. Exiting.")
print('\nInterrupted by user. Exiting.')
sys.exit(1)
if __name__ == '__main__':
main()
main()

86
src/emulsion/resolver.py Normal file
View File

@@ -0,0 +1,86 @@
import sys
class ValueResolver:
def __init__(self, config):
"""
config: The loaded configuration dictionary containing 'mappings' and
'defaults'.
"""
self.mappings = config.get('mappings', {})
self.defaults = config.get('defaults', {})
def resolve(self, cli_args, interactive=True):
"""
Resolves the final values for all fields.
Strategy:
1. Start with Config Defaults.
2. Overlay CLI Arguments.
3. Identify fields that require prompting (prompt=True in config).
4. If interactive, prompt for missing required fields.
5. Return final dictionary of {field: value}.
"""
# 1. Start with Defaults
# We filter defaults to only include things that might be fields (or
# core settings)
# Actually, 'defaults' in config might mix settings (time_increment) and
# fields (author).
# The executor will ignore keys it doesn't understand, so it's safe to
# pass all.
resolved = self.defaults.copy()
# 2. Overlay CLI Inputs
# cli_args is expected to be a dict of {key: value} provided by the user.
# This merges both --author and --field author=...
for key, val in cli_args.items():
if val is not None:
resolved[key] = val
# 3. Identify Prompts
# We look at the 'mappings' to see which fields want to be prompted.
# TODO: also here i wonder if there is a cleaner approach with a filter
# or something
if interactive:
fields_to_prompt = []
for field_name, schema in self.mappings.items():
# Check if prompt is requested
if (isinstance(schema, dict)
and schema.get('prompt', False)
# Check if we already have a value
and (field_name not in resolved or not resolved[field_name])
):
fields_to_prompt.append(field_name)
# Sort for stability (or maybe define priority in config later?)
fields_to_prompt.sort()
# 4. Prompt Loop
try:
for field in fields_to_prompt:
self._prompt_user(field, resolved)
except KeyboardInterrupt:
print("\nInterrupted. Exiting.")
sys.exit(1)
# Remove any fields that are still None/Empty (optional, but cleaner)
return {k: v for k, v in resolved.items() if v}
def _prompt_user(self, field_name, resolved_dict):
"""
Helper to prompt a single field.
"""
schema = self.mappings.get(field_name, {})
help_text = (
schema.get('help', field_name)
if isinstance(schema, dict)
else field_name
)
# We capitalize the field name for the prompt label if help text matches
# name
label = help_text
val = input(f"{label} (Optional): ").strip()
if val:
resolved_dict[field_name] = val