Refactor, add side-car and version

This commit is contained in:
Alexander Wainwright
2025-12-19 21:30:43 +10:00
parent 5525d309bf
commit 0c7f244a99

View File

@@ -4,12 +4,21 @@ import sys
import subprocess
import datetime
import toml
import shlex # Used for properly quoting shell commands
from alive_progress import alive_bar
from concurrent.futures import ThreadPoolExecutor, as_completed
from importlib.metadata import version, PackageNotFoundError
CONFIG_PATH = os.path.expanduser("~/.config/emulsion/config.toml")
def get_version():
try:
return version('emulsion')
except PackageNotFoundError:
return 'unknown'
def load_config():
if os.path.isfile(CONFIG_PATH):
try:
@@ -28,6 +37,12 @@ def parse_args(config):
help='Image files to process (e.g. *.jpg *.tif).'
)
parser.add_argument(
'-v', '--version',
action='version',
version=f'%(prog)s {get_version()}'
)
# Configurable fields
parser.add_argument('--author', default=None, help='Name of the photographer.')
parser.add_argument('--lab', default=None, help='Name of the lab who developed the film.')
@@ -38,6 +53,7 @@ def parse_args(config):
# Time settings
parser.add_argument('--base-date', default=None, help='Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).')
parser.add_argument('--time-increment', type=int, default=None, help='Time increment in seconds between images.')
parser.add_argument('--sidecar', action='store_true', help='Write to sidecar XMP files (e.g. image.jpg.xmp) instead of embedding.')
parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without modifying files.')
parser.add_argument('-j', '--workers', type=int, default=os.cpu_count() or 1, help='Number of parallel workers to run exiftool; defaults to number of CPUs.')
parser.add_argument('--init-config', action='store_true', help='Create a default config file (if none exists) and exit.')
@@ -45,97 +61,43 @@ def parse_args(config):
args = parser.parse_args()
# Merge from config
if args.author is None and 'author' in config:
args.author = config['author']
if args.lab is None and 'lab' in config:
args.lab = config['lab']
if args.make is None and 'make' in config:
args.make = config['make']
if args.model is None and 'model' in config:
args.model = config['model']
if args.film is None and 'film' in config:
args.film = config['film']
if args.time_increment is None and 'time_increment' in config:
args.time_increment = config['time_increment']
keys = ['author', 'lab', 'make', 'model', 'film', 'time_increment']
for k in keys:
if getattr(args, k) is None and k in config:
setattr(args, k, config[k])
return args
def prompt_for_config(args):
"""
Prompt for config-only fields before creating a config file.
(Base date is ephemeral, not stored in config.)
"""
try:
if not args.author:
args.author = input("Photographer's name (Author)? ").strip()
if args.lab is None:
resp = input("Lab name (optional, enter to skip)? ").strip()
args.lab = resp if resp else ""
args.lab = input("Lab name (optional, enter to skip)? ").strip()
if args.make is None:
resp = input("Camera make (optional, enter to skip)? ").strip()
args.make = resp if resp else ""
args.make = input("Camera make (optional, enter to skip)? ").strip()
if args.model is None:
resp = input("Camera model (optional, enter to skip)? ").strip()
args.model = resp if resp else ""
args.model = input("Camera model (optional, enter to skip)? ").strip()
if args.film is None:
resp = input("Film stock (optional, enter to skip)? ").strip()
args.film = resp if resp else ""
args.film = input("Film stock (optional, enter to skip)? ").strip()
if not args.time_increment:
dflt = "60"
resp = input(f"Time increment in seconds [{dflt}]: ").strip()
args.time_increment = int(resp) if resp else int(dflt)
except KeyboardInterrupt:
print("\nInterrupted by user. Exiting.")
sys.exit(1)
def prompt_if_missing(args):
"""
Prompt for ephemeral fields like base_date if missing,
and also fill in other fields if user didn't supply them.
"""
# Same prompts as config, plus base_date
prompt_for_config(args)
try:
if not args.author:
args.author = input("Photographer's name (Author)? ").strip()
if args.lab is None:
resp = input("Lab name (optional, enter to skip)? ").strip()
args.lab = resp if resp else ""
if args.make is None:
resp = input("Camera make (optional, enter to skip)? ").strip()
args.make = resp if resp else ""
if args.model is None:
resp = input("Camera model (optional, enter to skip)? ").strip()
args.model = resp if resp else ""
if args.film is None:
resp = input("Film stock (optional, enter to skip)? ").strip()
args.film = resp if resp else ""
if not args.base_date:
dflt = datetime.datetime.now().strftime("%Y-%m-%d")
resp = input(f"Base date/time for first image [{dflt}]: ").strip()
args.base_date = resp if resp else dflt
if not args.time_increment:
dflt = "60"
resp = input(f"Time increment in seconds [{dflt}]: ").strip()
args.time_increment = int(resp) if resp else int(dflt)
except KeyboardInterrupt:
print("\nInterrupted by user. Exiting.")
sys.exit(1)
@@ -149,60 +111,63 @@ def parse_user_date(dt_str):
return datetime.datetime.strptime(dt_str, "%Y-%m-%d")
def build_exiftool_cmd(file_path, author, lab, make, model, film, timestamp, dry_run=False):
def build_exiftool_cmd(file_path, author, lab, make, model, film, timestamp):
"""
Use standard EXIF fields:
- EXIF:Make (args.make)
- EXIF:Model (args.model)
- EXIF:UserComment (args.film)
Also store film in XMP:Description for better compatibility.
Builds the command list. Does NOT handle dry_run formatting.
Always returns a list for subprocess safety.
"""
current_year = datetime.datetime.now().year
cmd = [
"exiftool",
"-overwrite_original",
# Photographer info
f"-Artist={author}",
f"-Creator={author}",
f"-By-line={author}",
f"-Credit={author}",
f"-CopyrightNotice=© {current_year} {author}",
f"-Copyright=© {current_year} {author}",
# Timestamps
f"-DateTimeOriginal={timestamp}",
f"-CreateDate={timestamp}",
# Clear out some lab fields
"-WebStatement=",
"-CreatorWorkURL="
]
# Lab in XMP:DevelopedBy
if lab:
cmd.append(f"-XMP:DevelopedBy={lab}")
# If user gave a make, store it in EXIF:Make
if make:
cmd.append(f"-Make={make}")
# If user gave a model, store it in EXIF:Model
if model:
cmd.append(f"-Model={model}")
# If user gave a film stock, store it in EXIF:UserComment AND XMP:Description
if film:
cmd.append(f"-UserComment={film}")
cmd.append(f"-XMP:Description={film}")
cmd.append(file_path)
if dry_run:
return " ".join(cmd)
return cmd
def run_exiftool(cmd, dry_run=False):
"""
Unified execution logic.
Returns: (success_bool, message_str)
"""
if dry_run:
# Return the safe shell command string
safe_cmd = shlex.join(cmd)
return True, f"[DRY RUN] {safe_cmd}"
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
return True, "Updated"
except subprocess.CalledProcessError as e:
return False, f"Error: {e}"
def create_config_file(args):
if os.path.exists(CONFIG_PATH):
print("Config file already exists. Not overwriting.")
@@ -217,20 +182,12 @@ def create_config_file(args):
"time_increment": args.time_increment if args.time_increment else 60
}
# Remove empty values so user is prompted next time if they left something blank
keys_to_remove = []
for k, v in defaults.items():
if isinstance(v, str) and not v.strip():
keys_to_remove.append(k)
for k in keys_to_remove:
del defaults[k]
# Filter empty
defaults = {k: v for k, v in defaults.items() if not (isinstance(v, str) and not v.strip())}
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
toml.dump(defaults, f)
print(f"Created config file at {CONFIG_PATH}")
sys.exit(0)
@@ -239,10 +196,18 @@ def main():
try:
config = load_config()
args = parse_args(config)
# Default to number of CPUs if workers not specified
# Defaults
if args.workers is None:
args.workers = os.cpu_count() or 1
# LOGIC SIMPLIFICATION 1:
# Instead of writing a separate sequential loop for dry-runs,
# we just force workers=1 here. The executor handles the rest.
if args.dry_run:
print("Dry run detected: Forcing sequential processing.")
args.workers = 1
if args.init_config:
prompt_for_config(args)
create_config_file(args)
@@ -262,94 +227,66 @@ def main():
files = sorted(args.files)
total_files = len(files)
time_increment = args.time_increment if args.time_increment else 60
current_dt = base_dt
print(f"Processing {total_files} file(s)...")
with alive_bar(total_files, title="Tagging files") as bar:
if args.workers > 1 and not args.dry_run:
executor = ThreadPoolExecutor(max_workers=args.workers)
futures = {}
supported_idx = 0
for f in files:
ext = os.path.splitext(f)[1].lower()
if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']:
bar.text(f"Skipping unsupported file: {f}")
bar()
continue
ts_dt = base_dt + datetime.timedelta(seconds=supported_idx * time_increment)
timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S")
cmd = build_exiftool_cmd(
file_path=f,
author=args.author,
lab=args.lab,
make=args.make,
model=args.model,
film=args.film,
timestamp=timestamp_str,
dry_run=False
)
future = executor.submit(
subprocess.run, cmd, check=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
)
futures[future] = (f, timestamp_str)
supported_idx += 1
# 1. Prepare all tasks first
tasks = []
for i, f in enumerate(files):
ext = os.path.splitext(f)[1].lower()
if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']:
continue
ts_dt = base_dt + datetime.timedelta(seconds=i * time_increment)
timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S")
if args.sidecar:
target_file_path = f"{f}.xmp"
else:
target_file_path = f
cmd = build_exiftool_cmd(
file_path=target_file_path,
author=args.author,
lab=args.lab,
make=args.make,
model=args.model,
film=args.film,
timestamp=timestamp_str
)
tasks.append((cmd, f, timestamp_str))
with ThreadPoolExecutor(max_workers=args.workers) as executor:
# Submit all tasks
# Note: We pass args.dry_run into the function here
futures = {
executor.submit(run_exiftool, cmd, args.dry_run): (f, ts)
for cmd, f, ts in tasks
}
# Process results as they complete
for future in as_completed(futures):
f, ts = futures[future]
try:
future.result()
bar.text(f"Updated {f} => {ts}")
except subprocess.CalledProcessError as e:
bar.text(f"Failed to update {f}: {e}")
bar()
executor.shutdown(wait=True)
else:
supported_idx = 0
current_dt = base_dt
for f in files:
ext = os.path.splitext(f.lower())[1]
if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']:
bar.text(f"Skipping unsupported file: {f}")
bar()
continue
timestamp_str = current_dt.strftime("%Y:%m:%d %H:%M:%S")
cmd = build_exiftool_cmd(
file_path=f,
author=args.author,
lab=args.lab,
make=args.make,
model=args.model,
film=args.film,
timestamp=timestamp_str,
dry_run=args.dry_run
)
original_file, ts = futures[future]
success, msg = future.result()
if args.dry_run:
bar.text(f"DRY RUN: {cmd}")
# For dry run, we PRINT the command so the user can copy it
# bar.text() is transient, print() persists in terminal
print(msg)
elif not success:
bar.text(f"Failed {original_file}: {msg}")
else:
try:
subprocess.run(
cmd,
check=True,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
bar.text(f"Updated {f} => {timestamp_str}")
except subprocess.CalledProcessError as e:
bar.text(f"Failed to update {f}: {e}")
bar.text(f"Updated {original_file} => {ts}")
current_dt += datetime.timedelta(seconds=time_increment)
bar()
except KeyboardInterrupt:
print("\nInterrupted by user. Exiting.")
sys.exit(1)
if __name__ == '__main__':
main()