Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b886d26948 | ||
|
|
35e6410b2b | ||
|
|
6c00b8e733 | ||
|
|
3d8063d984 | ||
|
|
c1b031f29e | ||
|
|
91cc408d34 | ||
|
|
37fbce61c9 | ||
|
|
6536bf43de | ||
|
|
f188dd04de | ||
|
|
c91a151a2b | ||
|
|
0c7f244a99 | ||
|
|
5525d309bf |
@@ -4,12 +4,13 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "emulsion"
|
||||
version = "0.1.0"
|
||||
version = "0.1.2"
|
||||
description = "A tool for updating exif tags"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"toml",
|
||||
"alive-progress",
|
||||
"types-toml",
|
||||
]
|
||||
authors = [
|
||||
{name = "Alexander Wainwright", email = "code@figtree.dev"},
|
||||
@@ -27,3 +28,43 @@ where = ["src"]
|
||||
|
||||
[project.scripts]
|
||||
emulsion = "emulsion.main:main"
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 80
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = [
|
||||
"B",
|
||||
"W",
|
||||
"ANN",
|
||||
"FIX",
|
||||
"S",
|
||||
"F", # Pyflakes rules
|
||||
"W", # PyCodeStyle warnings
|
||||
"E", # PyCodeStyle errors
|
||||
"I", # Sort imports "properly"
|
||||
"UP", # Warn if certain things can changed due to newer Python versions
|
||||
"C4", # Catch incorrect use of comprehensions, dict, list, etc
|
||||
"FA", # Enforce from __future__ import annotations
|
||||
"ISC", # Good use of string concatenation
|
||||
"ICN", # Use common import conventions
|
||||
"RET", # Good return practices
|
||||
"SIM", # Common simplification rules
|
||||
"TID", # Some good import practices
|
||||
"TC", # Enforce importing certain types in a TYPE_CHECKING block
|
||||
"PTH", # Use pathlib instead of os.path
|
||||
"NPY", # Some numpy-specific things
|
||||
]
|
||||
|
||||
ignore = [
|
||||
"W191",
|
||||
"E101", # allow spaces for alignment
|
||||
]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"__init__.py" = ["E402"]
|
||||
"**/{tests,docs,tools}/*" = ["E402"]
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = "single"
|
||||
indent-style = "tab"
|
||||
|
||||
124
src/emulsion/config.py
Normal file
124
src/emulsion/config.py
Normal file
@@ -0,0 +1,124 @@
|
||||
import copy
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import toml
|
||||
|
||||
# Default Schema
|
||||
# This defines the "First Class" feel of the app, but is fully overridable.
|
||||
DEFAULT_CONFIG: dict[str, Any] = {
|
||||
"sidecar": {
|
||||
"extension": ".xmp"
|
||||
},
|
||||
"defaults": {
|
||||
"time_increment": 60
|
||||
},
|
||||
"mappings": {
|
||||
"author": {
|
||||
"flags": [
|
||||
"-Artist={value}",
|
||||
"-Creator={value}",
|
||||
"-By-line={value}",
|
||||
"-Credit={value}",
|
||||
"-CopyrightNotice=© {year} {value}",
|
||||
"-Copyright=© {year} {value}"
|
||||
],
|
||||
"prompt": True,
|
||||
"help": "Name of the photographer"
|
||||
},
|
||||
"lab": {
|
||||
"flags": ["-XMP:DevelopedBy={value}"],
|
||||
"prompt": True,
|
||||
"help": "Lab name"
|
||||
},
|
||||
"make": {
|
||||
"flags": ["-Make={value}"],
|
||||
"prompt": True,
|
||||
"help": "Camera make"
|
||||
},
|
||||
"model": {
|
||||
"flags": ["-Model={value}"],
|
||||
"prompt": True,
|
||||
"help": "Camera model"
|
||||
},
|
||||
"lens": {
|
||||
"flags": ["-LensModel={value}", "-Lens={value}"],
|
||||
"prompt": True,
|
||||
"help": "Lens model"
|
||||
},
|
||||
"film": {
|
||||
"flags": ["-UserComment={value}", "-XMP:Description={value}"],
|
||||
"prompt": False,
|
||||
"help": "Film stock"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def get_config_path() -> Path:
|
||||
xdg_config_home = os.environ.get(
|
||||
'XDG_CONFIG_HOME', str(Path('~/.config').expanduser())
|
||||
)
|
||||
return Path(xdg_config_home) / 'emulsion' / 'config.toml'
|
||||
|
||||
|
||||
class ConfigLoader:
|
||||
def __init__(self, path: Path | None = None) -> None:
|
||||
self.path: Path = path or get_config_path()
|
||||
self.config: dict[str, Any] = copy.deepcopy(DEFAULT_CONFIG)
|
||||
|
||||
def load(self) -> dict[str, Any]:
|
||||
"""
|
||||
Loads the config from disk and merges it into the defaults.
|
||||
Returns the full config dictionary.
|
||||
"""
|
||||
if self.path.is_file():
|
||||
try:
|
||||
user_config = toml.load(self.path)
|
||||
self._merge(self.config, user_config)
|
||||
except Exception as e:
|
||||
# We might want to let the caller handle this, or just print
|
||||
# warning
|
||||
print(
|
||||
f'Warning: Could not parse config file at {self.path}: {e}'
|
||||
)
|
||||
|
||||
return self.config
|
||||
|
||||
def _merge(self, base: dict[str, Any], update: dict[str, Any]) -> None:
|
||||
"""
|
||||
Recursively merges 'update' dict into 'base' dict.
|
||||
"""
|
||||
for key, value in update.items():
|
||||
if (
|
||||
isinstance(value, dict)
|
||||
and key in base
|
||||
and isinstance(base[key], dict)
|
||||
):
|
||||
self._merge(base[key], value)
|
||||
else:
|
||||
base[key] = value
|
||||
|
||||
def save_defaults(self, current_defaults: dict[str, Any]) -> bool:
|
||||
"""
|
||||
Helpers to write a new config file (for --init-config). This is a bit
|
||||
tricky because we don't want to just dump the massive DEFAULT_CONFIG. We
|
||||
likely want to write a file that reflects what the user *currently* has
|
||||
+ defaults.
|
||||
"""
|
||||
# For now, simplistic implementation: Dump the merged state.
|
||||
# In a real app, we might want to preserve comments etc, but TOML lib
|
||||
# doesn't do that.
|
||||
|
||||
# Ensure directory exists
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# We probably only want to write if it doesn't exist, to avoid
|
||||
# clobbering.
|
||||
if self.path.exists():
|
||||
return False
|
||||
|
||||
with self.path.open('w', encoding='utf-8') as f:
|
||||
toml.dump(current_defaults, f)
|
||||
return True
|
||||
201
src/emulsion/executor.py
Normal file
201
src/emulsion/executor.py
Normal file
@@ -0,0 +1,201 @@
|
||||
import datetime
|
||||
import shlex
|
||||
import subprocess
|
||||
from argparse import Namespace
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from alive_progress import alive_bar
|
||||
|
||||
|
||||
class Executor:
|
||||
def __init__(self, config: dict[str, Any]) -> None:
|
||||
self.config = config
|
||||
self.mappings: dict[str, Any] = config.get('mappings', {})
|
||||
self.sidecar_ext: str = config.get(
|
||||
'sidecar', {}).get('extension', '.xmp')
|
||||
if not self.sidecar_ext.startswith('.'):
|
||||
self.sidecar_ext = f".{self.sidecar_ext}"
|
||||
|
||||
def run_batch(
|
||||
self, files: list[str],
|
||||
resolved_values: dict[str, str],
|
||||
options: Namespace
|
||||
) -> None:
|
||||
"""
|
||||
Main execution entry point.
|
||||
files: List of file paths
|
||||
resolved_values: Dictionary of final field values
|
||||
options: Dictionary/Namespace of process options (dry_run, workers,
|
||||
base_date, etc.)
|
||||
"""
|
||||
# Filter supported files
|
||||
extensions = ['.jpg', '.jpeg', '.tif', '.tiff']
|
||||
valid_files = [
|
||||
f for f in files
|
||||
if Path(f).suffix.lower() in extensions
|
||||
]
|
||||
|
||||
if not valid_files:
|
||||
print("No valid image files found to process.")
|
||||
return
|
||||
|
||||
# Sort for consistent time incrementing
|
||||
valid_files.sort()
|
||||
total_files = len(valid_files)
|
||||
|
||||
# Parse base date
|
||||
try:
|
||||
base_dt = self._parse_date(options.base_date)
|
||||
except ValueError:
|
||||
print(
|
||||
f"Error: Base date '{options.base_date}' must be "
|
||||
"'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'."
|
||||
)
|
||||
return
|
||||
|
||||
time_increment = options.time_increment or 60
|
||||
workers = options.workers or 1
|
||||
dry_run = options.dry_run
|
||||
|
||||
print(f"Processing {total_files} file(s)...")
|
||||
|
||||
# Prepare tasks
|
||||
tasks: list[tuple[list[str], str, str]] = []
|
||||
for i, f in enumerate(valid_files):
|
||||
# Calculate timestamp
|
||||
ts_dt = base_dt + datetime.timedelta(seconds=i * time_increment)
|
||||
timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S")
|
||||
|
||||
# Determine file targets (Sidecar logic)
|
||||
target_path, sidecar_source = self._determine_paths(
|
||||
Path(f), options.embed
|
||||
)
|
||||
|
||||
# Build Command
|
||||
cmd = self._build_cmd(
|
||||
target_path,
|
||||
resolved_values,
|
||||
timestamp_str,
|
||||
sidecar_source
|
||||
)
|
||||
tasks.append((cmd, f, timestamp_str))
|
||||
|
||||
# Execute
|
||||
with (
|
||||
alive_bar(total_files, title="Tagging files") as bar,
|
||||
ThreadPoolExecutor(max_workers=workers) as executor
|
||||
):
|
||||
futures = {
|
||||
executor.submit(self._run_exiftool, cmd, dry_run): (f, ts)
|
||||
for cmd, f, ts in tasks
|
||||
}
|
||||
|
||||
for future in as_completed(futures):
|
||||
original_file, ts = futures[future]
|
||||
success, msg = future.result()
|
||||
|
||||
if dry_run:
|
||||
print(msg)
|
||||
elif not success:
|
||||
bar.text(f"Failed {original_file}: {msg}")
|
||||
else:
|
||||
bar.text(f"Updated {original_file} => {ts}")
|
||||
|
||||
bar()
|
||||
|
||||
def _determine_paths(
|
||||
self, original_file: Path, embed: bool
|
||||
) -> tuple[Path, Path | None]:
|
||||
"""
|
||||
Returns (target_path, sidecar_source_if_needed)
|
||||
"""
|
||||
if embed:
|
||||
return Path(original_file), None
|
||||
|
||||
target_path = Path(f'{original_file}{self.sidecar_ext}')
|
||||
|
||||
# If sidecar doesn't exist, we need to tell ExifTool to read from source
|
||||
# and write to the new sidecar file.
|
||||
if not Path(target_path).exists():
|
||||
return target_path, original_file
|
||||
|
||||
return target_path, None
|
||||
|
||||
def _build_cmd(
|
||||
self,
|
||||
file_path: Path,
|
||||
field_values: dict[str, str],
|
||||
timestamp_str: str,
|
||||
sidecar_source: Path | None = None
|
||||
) -> list[str]:
|
||||
current_year = datetime.datetime.now().year
|
||||
|
||||
# Core setup
|
||||
cmd = [
|
||||
"exiftool",
|
||||
"-overwrite_original",
|
||||
f"-DateTimeOriginal={timestamp_str}",
|
||||
f"-CreateDate={timestamp_str}",
|
||||
"-WebStatement=",
|
||||
"-CreatorWorkURL="
|
||||
]
|
||||
|
||||
# Add mapped fields
|
||||
for field_name, val in field_values.items():
|
||||
if field_name in self.mappings:
|
||||
schema = self.mappings[field_name]
|
||||
# Schema can be dict (new style) or list (old style/simple)
|
||||
flags = (
|
||||
schema.get('flags', [])
|
||||
if isinstance(schema, dict)
|
||||
else schema
|
||||
)
|
||||
|
||||
# Ensure flags is a list (just in case)
|
||||
if isinstance(flags, list):
|
||||
for flag in flags:
|
||||
safe_flag = flag.replace(
|
||||
'{value}',
|
||||
str(val)).replace('{year}', str(current_year)
|
||||
)
|
||||
cmd.append(safe_flag)
|
||||
|
||||
# Sidecar handling
|
||||
if sidecar_source:
|
||||
# -srcfile SOURCE TARGET
|
||||
cmd.append("-srcfile")
|
||||
cmd.append(str(file_path))
|
||||
cmd.append(str(sidecar_source))
|
||||
else:
|
||||
cmd.append(str(file_path))
|
||||
|
||||
return cmd
|
||||
|
||||
def _run_exiftool(self, cmd: list[str], dry_run: bool) -> tuple[bool, str]:
|
||||
if dry_run:
|
||||
safe_cmd = shlex.join(cmd)
|
||||
return True, f"[DRY RUN] {safe_cmd}"
|
||||
|
||||
try:
|
||||
subprocess.run( # noqa: S603
|
||||
cmd,
|
||||
check=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
return True, "Updated"
|
||||
except subprocess.CalledProcessError as e:
|
||||
return False, f"Error: {e}"
|
||||
except FileNotFoundError:
|
||||
return False, "Error: 'exiftool' not found. Please install it."
|
||||
|
||||
def _parse_date(self, dt_str: str | None) -> datetime.datetime:
|
||||
if not dt_str:
|
||||
# Should be handled by resolver/validator, but safe fallback
|
||||
return datetime.datetime.now()
|
||||
dt_str = dt_str.strip()
|
||||
if " " in dt_str:
|
||||
return datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
|
||||
return datetime.datetime.strptime(dt_str, "%Y-%m-%d")
|
||||
@@ -1,352 +1,235 @@
|
||||
import argparse
|
||||
import datetime
|
||||
import os
|
||||
import sys
|
||||
import subprocess
|
||||
import datetime
|
||||
import toml
|
||||
from alive_progress import alive_bar
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from importlib.metadata import PackageNotFoundError, version
|
||||
from typing import Any
|
||||
|
||||
CONFIG_PATH = os.path.expanduser("~/.config/emulsion/config.toml")
|
||||
from emulsion.config import ConfigLoader
|
||||
from emulsion.executor import Executor
|
||||
from emulsion.resolver import ValueResolver
|
||||
|
||||
|
||||
def load_config():
|
||||
if os.path.isfile(CONFIG_PATH):
|
||||
try:
|
||||
return toml.load(CONFIG_PATH)
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not parse config file: {e}")
|
||||
return {}
|
||||
def get_version() -> str:
|
||||
try:
|
||||
return version('emulsion')
|
||||
except PackageNotFoundError:
|
||||
return 'unknown'
|
||||
|
||||
|
||||
def parse_args(config):
|
||||
parser = argparse.ArgumentParser(description='A tool for updating exif tags')
|
||||
|
||||
parser.add_argument(
|
||||
'files',
|
||||
nargs='*',
|
||||
help='Image files to process (e.g. *.jpg *.tif).'
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(
|
||||
description='A tool for updating exif tags'
|
||||
)
|
||||
|
||||
# Configurable fields
|
||||
parser.add_argument('--author', default=None, help='Name of the photographer.')
|
||||
parser.add_argument('--lab', default=None, help='Name of the lab who developed the film.')
|
||||
parser.add_argument('--make', default=None, help='Camera make (stored in EXIF:Make).')
|
||||
parser.add_argument('--model', default=None, help='Camera model (stored in EXIF:Model).')
|
||||
parser.add_argument('--film', default=None, help='Film stock (stored in EXIF:UserComment and XMP:Description).')
|
||||
parser.add_argument(
|
||||
'files', nargs='*', help='Image files to process (e.g. *.jpg *.tif).'
|
||||
)
|
||||
|
||||
# Time settings
|
||||
parser.add_argument('--base-date', default=None, help='Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).')
|
||||
parser.add_argument('--time-increment', type=int, default=None, help='Time increment in seconds between images.')
|
||||
parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without modifying files.')
|
||||
parser.add_argument('-j', '--workers', type=int, default=os.cpu_count() or 1, help='Number of parallel workers to run exiftool; defaults to number of CPUs.')
|
||||
parser.add_argument('--init-config', action='store_true', help='Create a default config file (if none exists) and exit.')
|
||||
parser.add_argument(
|
||||
'-v', '--version', action='version', version=f'%(prog)s {get_version()}'
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
# --- First-Class Fields ---
|
||||
parser.add_argument('--author', help='Name of the photographer.')
|
||||
parser.add_argument('--lab', help='Name of the lab who developed the film.')
|
||||
parser.add_argument('--make', help='Camera make.')
|
||||
parser.add_argument('--model', help='Camera model.')
|
||||
parser.add_argument('--lens', help='Lens model.')
|
||||
parser.add_argument('--film', help='Film stock.')
|
||||
|
||||
# Merge from config
|
||||
if args.author is None and 'author' in config:
|
||||
args.author = config['author']
|
||||
# --- Generic/Custom Fields ---
|
||||
parser.add_argument(
|
||||
'--field',
|
||||
action='append',
|
||||
dest='custom_fields',
|
||||
metavar='KEY=VALUE',
|
||||
help=(
|
||||
'Set a custom field defined in config (e.g., '
|
||||
'--field location="Paris").'
|
||||
),
|
||||
)
|
||||
|
||||
if args.lab is None and 'lab' in config:
|
||||
args.lab = config['lab']
|
||||
# --- Process Control ---
|
||||
parser.add_argument(
|
||||
'--base-date',
|
||||
default=None,
|
||||
help=(
|
||||
'Base date or date/time (e.g. 2023-04-10 or 2023-04-10 12:00:00).'
|
||||
),
|
||||
)
|
||||
|
||||
if args.make is None and 'make' in config:
|
||||
args.make = config['make']
|
||||
parser.add_argument(
|
||||
'--time-increment',
|
||||
type=int,
|
||||
default=None,
|
||||
help='Time increment in seconds between images.',
|
||||
)
|
||||
|
||||
if args.model is None and 'model' in config:
|
||||
args.model = config['model']
|
||||
parser.add_argument(
|
||||
'--embed',
|
||||
action='store_true',
|
||||
help=(
|
||||
'Embed EXIF data directly into the image file instead of a sidecar.'
|
||||
),
|
||||
)
|
||||
|
||||
if args.film is None and 'film' in config:
|
||||
args.film = config['film']
|
||||
parser.add_argument(
|
||||
'--dry-run',
|
||||
action='store_true',
|
||||
help='Show what would be changed without modifying files.',
|
||||
)
|
||||
|
||||
if args.time_increment is None and 'time_increment' in config:
|
||||
args.time_increment = config['time_increment']
|
||||
parser.add_argument(
|
||||
'-j',
|
||||
'--workers',
|
||||
type=int,
|
||||
nargs='?',
|
||||
const=os.cpu_count() or 1,
|
||||
default=os.cpu_count() or 1,
|
||||
help=(
|
||||
'Number of parallel workers to run exiftool; defaults to number '
|
||||
'of CPUs.'
|
||||
),
|
||||
)
|
||||
|
||||
return args
|
||||
parser.add_argument(
|
||||
'--init-config',
|
||||
action='store_true',
|
||||
help='Create a default config file (if none exists) and exit.',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--no-interaction',
|
||||
action='store_true',
|
||||
help='Do not prompt for missing fields (skip them if missing).',
|
||||
)
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def prompt_for_config(args):
|
||||
def prompt_for_defaults(config: dict[str, Any]) -> None:
|
||||
"""
|
||||
Prompt for config-only fields before creating a config file.
|
||||
(Base date is ephemeral, not stored in config.)
|
||||
Prompts the user for default values to populate the initial config.
|
||||
"""
|
||||
print('Initializing configuration. Press Enter to skip any field.')
|
||||
|
||||
# We'll iterate over the 'mappings' to find what fields are available,
|
||||
# but we'll prioritize the 'core' ones for a better UX order.
|
||||
core_fields = ['author', 'lab', 'make', 'model', 'lens', 'film']
|
||||
mappings = config.get('mappings', {})
|
||||
defaults = config.setdefault('defaults', {})
|
||||
|
||||
# Prompt for core fields first
|
||||
for field in core_fields:
|
||||
if field in mappings:
|
||||
schema = mappings[field]
|
||||
help_text = (
|
||||
schema.get('help', field) if isinstance(schema, dict) else field
|
||||
)
|
||||
val = input(f'Default {help_text} (optional): ').strip()
|
||||
if val:
|
||||
defaults[field] = val
|
||||
|
||||
# Time increment
|
||||
dflt_inc = defaults.get('time_increment', 60)
|
||||
val = input(f'Default Time Increment [seconds] ({dflt_inc}): ').strip()
|
||||
if val:
|
||||
try:
|
||||
defaults['time_increment'] = int(val)
|
||||
except ValueError:
|
||||
print('Invalid number, keeping default.')
|
||||
|
||||
|
||||
def main() -> None:
|
||||
try:
|
||||
if not args.author:
|
||||
args.author = input("Photographer's name (Author)? ").strip()
|
||||
# 1. Load Config
|
||||
loader = ConfigLoader()
|
||||
config = loader.load()
|
||||
|
||||
if args.lab is None:
|
||||
resp = input("Lab name (optional, enter to skip)? ").strip()
|
||||
args.lab = resp if resp else ""
|
||||
|
||||
if args.make is None:
|
||||
resp = input("Camera make (optional, enter to skip)? ").strip()
|
||||
args.make = resp if resp else ""
|
||||
|
||||
if args.model is None:
|
||||
resp = input("Camera model (optional, enter to skip)? ").strip()
|
||||
args.model = resp if resp else ""
|
||||
|
||||
if args.film is None:
|
||||
resp = input("Film stock (optional, enter to skip)? ").strip()
|
||||
args.film = resp if resp else ""
|
||||
|
||||
if not args.time_increment:
|
||||
dflt = "60"
|
||||
resp = input(f"Time increment in seconds [{dflt}]: ").strip()
|
||||
args.time_increment = int(resp) if resp else int(dflt)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def prompt_if_missing(args):
|
||||
"""
|
||||
Prompt for ephemeral fields like base_date if missing,
|
||||
and also fill in other fields if user didn't supply them.
|
||||
"""
|
||||
try:
|
||||
if not args.author:
|
||||
args.author = input("Photographer's name (Author)? ").strip()
|
||||
|
||||
if args.lab is None:
|
||||
resp = input("Lab name (optional, enter to skip)? ").strip()
|
||||
args.lab = resp if resp else ""
|
||||
|
||||
if args.make is None:
|
||||
resp = input("Camera make (optional, enter to skip)? ").strip()
|
||||
args.make = resp if resp else ""
|
||||
|
||||
if args.model is None:
|
||||
resp = input("Camera model (optional, enter to skip)? ").strip()
|
||||
args.model = resp if resp else ""
|
||||
|
||||
if args.film is None:
|
||||
resp = input("Film stock (optional, enter to skip)? ").strip()
|
||||
args.film = resp if resp else ""
|
||||
|
||||
if not args.base_date:
|
||||
dflt = datetime.datetime.now().strftime("%Y-%m-%d")
|
||||
resp = input(f"Base date/time for first image [{dflt}]: ").strip()
|
||||
args.base_date = resp if resp else dflt
|
||||
|
||||
if not args.time_increment:
|
||||
dflt = "60"
|
||||
resp = input(f"Time increment in seconds [{dflt}]: ").strip()
|
||||
args.time_increment = int(resp) if resp else int(dflt)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def parse_user_date(dt_str):
|
||||
dt_str = dt_str.strip()
|
||||
if " " in dt_str:
|
||||
return datetime.datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
|
||||
else:
|
||||
return datetime.datetime.strptime(dt_str, "%Y-%m-%d")
|
||||
|
||||
|
||||
def build_exiftool_cmd(file_path, author, lab, make, model, film, timestamp, dry_run=False):
|
||||
"""
|
||||
Use standard EXIF fields:
|
||||
- EXIF:Make (args.make)
|
||||
- EXIF:Model (args.model)
|
||||
- EXIF:UserComment (args.film)
|
||||
Also store film in XMP:Description for better compatibility.
|
||||
"""
|
||||
current_year = datetime.datetime.now().year
|
||||
cmd = [
|
||||
"exiftool",
|
||||
"-overwrite_original",
|
||||
|
||||
# Photographer info
|
||||
f"-Artist={author}",
|
||||
f"-Creator={author}",
|
||||
f"-By-line={author}",
|
||||
f"-Credit={author}",
|
||||
f"-CopyrightNotice=© {current_year} {author}",
|
||||
f"-Copyright=© {current_year} {author}",
|
||||
|
||||
# Timestamps
|
||||
f"-DateTimeOriginal={timestamp}",
|
||||
|
||||
# Clear out some lab fields
|
||||
"-WebStatement=",
|
||||
"-CreatorWorkURL="
|
||||
]
|
||||
|
||||
# Lab in XMP:DevelopedBy
|
||||
if lab:
|
||||
cmd.append(f"-XMP:DevelopedBy={lab}")
|
||||
|
||||
# If user gave a make, store it in EXIF:Make
|
||||
if make:
|
||||
cmd.append(f"-Make={make}")
|
||||
|
||||
# If user gave a model, store it in EXIF:Model
|
||||
if model:
|
||||
cmd.append(f"-Model={model}")
|
||||
|
||||
# If user gave a film stock, store it in EXIF:UserComment AND XMP:Description
|
||||
if film:
|
||||
cmd.append(f"-UserComment={film}")
|
||||
cmd.append(f"-XMP:Description={film}")
|
||||
|
||||
cmd.append(file_path)
|
||||
|
||||
if dry_run:
|
||||
return " ".join(cmd)
|
||||
return cmd
|
||||
|
||||
|
||||
def create_config_file(args):
|
||||
if os.path.exists(CONFIG_PATH):
|
||||
print("Config file already exists. Not overwriting.")
|
||||
sys.exit(0)
|
||||
|
||||
defaults = {
|
||||
"author": args.author or "Your Name",
|
||||
"lab": args.lab or "",
|
||||
"make": args.make or "",
|
||||
"model": args.model or "",
|
||||
"film": args.film or "",
|
||||
"time_increment": args.time_increment if args.time_increment else 60
|
||||
}
|
||||
|
||||
# Remove empty values so user is prompted next time if they left something blank
|
||||
keys_to_remove = []
|
||||
for k, v in defaults.items():
|
||||
if isinstance(v, str) and not v.strip():
|
||||
keys_to_remove.append(k)
|
||||
|
||||
for k in keys_to_remove:
|
||||
del defaults[k]
|
||||
|
||||
os.makedirs(os.path.dirname(CONFIG_PATH), exist_ok=True)
|
||||
|
||||
with open(CONFIG_PATH, "w", encoding="utf-8") as f:
|
||||
toml.dump(defaults, f)
|
||||
|
||||
print(f"Created config file at {CONFIG_PATH}")
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
def main():
|
||||
try:
|
||||
config = load_config()
|
||||
args = parse_args(config)
|
||||
# Default to number of CPUs if workers not specified
|
||||
if args.workers is None:
|
||||
args.workers = os.cpu_count() or 1
|
||||
# 2. Parse CLI
|
||||
args = parse_args()
|
||||
|
||||
# Handle Initialization
|
||||
if args.init_config:
|
||||
prompt_for_config(args)
|
||||
create_config_file(args)
|
||||
if loader.path.exists():
|
||||
print(
|
||||
f'Config file already exists at {loader.path}. Not '
|
||||
'overwriting.'
|
||||
)
|
||||
sys.exit(0)
|
||||
|
||||
if not args.files:
|
||||
print("No files provided.")
|
||||
# Prompt user for initial values
|
||||
try:
|
||||
prompt_for_defaults(config)
|
||||
except KeyboardInterrupt:
|
||||
print('\nAborted.')
|
||||
sys.exit(1)
|
||||
|
||||
if loader.save_defaults(config):
|
||||
print(f'Created config file at {loader.path}')
|
||||
else:
|
||||
# Should be caught by the check above, but for safety
|
||||
print('Config file already exists. Not overwriting.')
|
||||
sys.exit(0)
|
||||
|
||||
prompt_if_missing(args)
|
||||
if not args.files:
|
||||
print('No files provided.')
|
||||
sys.exit(0)
|
||||
|
||||
try:
|
||||
base_dt = parse_user_date(args.base_date)
|
||||
except ValueError:
|
||||
print(f"Error: Base date '{args.base_date}' must be 'YYYY-MM-DD' or 'YYYY-MM-DD HH:MM:SS'.")
|
||||
# Handle Base Date Prompt logic
|
||||
if not args.base_date and not args.no_interaction:
|
||||
dflt = datetime.datetime.now().strftime('%Y-%m-%d')
|
||||
resp = input(f'Base date/time for first image [{dflt}]: ').strip()
|
||||
args.base_date = resp if resp else dflt
|
||||
|
||||
if not args.base_date:
|
||||
print('Error: Base date is required.')
|
||||
sys.exit(1)
|
||||
|
||||
files = sorted(args.files)
|
||||
total_files = len(files)
|
||||
time_increment = args.time_increment if args.time_increment else 60
|
||||
current_dt = base_dt
|
||||
# 3. Prepare Inputs for Resolver
|
||||
# We need to mash --author and --field author=... into one dict
|
||||
user_inputs: dict[str, Any] = {}
|
||||
|
||||
print(f"Processing {total_files} file(s)...")
|
||||
# First-Class args
|
||||
for field in ['author', 'lab', 'make', 'model', 'lens', 'film']:
|
||||
val = getattr(args, field, None)
|
||||
if val:
|
||||
user_inputs[field] = val
|
||||
|
||||
with alive_bar(total_files, title="Tagging files") as bar:
|
||||
if args.workers > 1 and not args.dry_run:
|
||||
executor = ThreadPoolExecutor(max_workers=args.workers)
|
||||
futures = {}
|
||||
supported_idx = 0
|
||||
for f in files:
|
||||
ext = os.path.splitext(f)[1].lower()
|
||||
if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']:
|
||||
bar.text(f"Skipping unsupported file: {f}")
|
||||
bar()
|
||||
continue
|
||||
|
||||
ts_dt = base_dt + datetime.timedelta(seconds=supported_idx * time_increment)
|
||||
timestamp_str = ts_dt.strftime("%Y:%m:%d %H:%M:%S")
|
||||
cmd = build_exiftool_cmd(
|
||||
file_path=f,
|
||||
author=args.author,
|
||||
lab=args.lab,
|
||||
make=args.make,
|
||||
model=args.model,
|
||||
film=args.film,
|
||||
timestamp=timestamp_str,
|
||||
dry_run=False
|
||||
)
|
||||
future = executor.submit(
|
||||
subprocess.run, cmd, check=True,
|
||||
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
|
||||
)
|
||||
futures[future] = (f, timestamp_str)
|
||||
supported_idx += 1
|
||||
|
||||
for future in as_completed(futures):
|
||||
f, ts = futures[future]
|
||||
try:
|
||||
future.result()
|
||||
bar.text(f"Updated {f} => {ts}")
|
||||
except subprocess.CalledProcessError as e:
|
||||
bar.text(f"Failed to update {f}: {e}")
|
||||
bar()
|
||||
|
||||
executor.shutdown(wait=True)
|
||||
else:
|
||||
supported_idx = 0
|
||||
current_dt = base_dt
|
||||
for f in files:
|
||||
ext = os.path.splitext(f.lower())[1]
|
||||
if ext not in ['.jpg', '.jpeg', '.tif', '.tiff']:
|
||||
bar.text(f"Skipping unsupported file: {f}")
|
||||
bar()
|
||||
continue
|
||||
|
||||
timestamp_str = current_dt.strftime("%Y:%m:%d %H:%M:%S")
|
||||
cmd = build_exiftool_cmd(
|
||||
file_path=f,
|
||||
author=args.author,
|
||||
lab=args.lab,
|
||||
make=args.make,
|
||||
model=args.model,
|
||||
film=args.film,
|
||||
timestamp=timestamp_str,
|
||||
dry_run=args.dry_run
|
||||
# Custom args
|
||||
if args.custom_fields:
|
||||
for item in args.custom_fields:
|
||||
if '=' in item:
|
||||
key, val = item.split('=', 1)
|
||||
user_inputs[key.strip()] = val.strip()
|
||||
else:
|
||||
print(
|
||||
f"Warning: Invalid format for --field '{item}'. "
|
||||
'Expected KEY=VALUE.'
|
||||
)
|
||||
|
||||
if args.dry_run:
|
||||
bar.text(f"DRY RUN: {cmd}")
|
||||
else:
|
||||
try:
|
||||
subprocess.run(
|
||||
cmd,
|
||||
check=True,
|
||||
stdout=subprocess.DEVNULL,
|
||||
stderr=subprocess.DEVNULL
|
||||
)
|
||||
bar.text(f"Updated {f} => {timestamp_str}")
|
||||
except subprocess.CalledProcessError as e:
|
||||
bar.text(f"Failed to update {f}: {e}")
|
||||
# 4. Resolve Metadata
|
||||
resolver = ValueResolver(config)
|
||||
resolved_values = resolver.resolve(
|
||||
user_inputs, interactive=not args.no_interaction
|
||||
)
|
||||
|
||||
current_dt += datetime.timedelta(seconds=time_increment)
|
||||
bar()
|
||||
# 5. Execute
|
||||
executor = Executor(config)
|
||||
|
||||
# We pass 'args' as the options object (has dry_run, workers, etc)
|
||||
# Just need to make sure time_increment is resolved from config defaults
|
||||
# if missing
|
||||
if args.time_increment is None:
|
||||
args.time_increment = config.get('defaults', {}).get(
|
||||
'time_increment', 60
|
||||
)
|
||||
|
||||
executor.run_batch(args.files, resolved_values, args)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted by user. Exiting.")
|
||||
print('\nInterrupted by user. Exiting.')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
|
||||
90
src/emulsion/resolver.py
Normal file
90
src/emulsion/resolver.py
Normal file
@@ -0,0 +1,90 @@
|
||||
import sys
|
||||
from typing import Any
|
||||
|
||||
|
||||
class ValueResolver:
|
||||
def __init__(self, config: dict[str, Any]) -> None:
|
||||
"""
|
||||
config: The loaded configuration dictionary containing 'mappings' and
|
||||
'defaults'.
|
||||
"""
|
||||
self.mappings: dict[str, Any] = config.get('mappings', {})
|
||||
self.defaults: dict[str, Any] = config.get('defaults', {})
|
||||
|
||||
def resolve(
|
||||
self, cli_args: dict[str, Any], interactive: bool = True
|
||||
) -> dict[str, str]:
|
||||
"""
|
||||
Resolves the final values for all fields.
|
||||
|
||||
Strategy:
|
||||
1. Start with Config Defaults.
|
||||
2. Overlay CLI Arguments.
|
||||
3. Identify fields that require prompting (prompt=True in config).
|
||||
4. If interactive, prompt for missing required fields.
|
||||
5. Return final dictionary of {field: value}.
|
||||
"""
|
||||
# 1. Start with Defaults
|
||||
# We filter defaults to only include things that might be fields (or
|
||||
# core settings)
|
||||
# Actually, 'defaults' in config might mix settings (time_increment) and
|
||||
# fields (author).
|
||||
# The executor will ignore keys it doesn't understand, so it's safe to
|
||||
# pass all.
|
||||
resolved = self.defaults.copy()
|
||||
|
||||
# 2. Overlay CLI Inputs
|
||||
# cli_args is expected to be a dict of {key: value} provided by the
|
||||
# user. This merges both --author and --field author=...
|
||||
for key, val in cli_args.items():
|
||||
if val is not None:
|
||||
resolved[key] = val
|
||||
|
||||
# 3. Identify Prompts
|
||||
# We look at the 'mappings' to see which fields want to be prompted.
|
||||
if interactive:
|
||||
fields_to_prompt: list[str] = []
|
||||
for field_name, schema in self.mappings.items():
|
||||
# Check if prompt is requested
|
||||
if (isinstance(schema, dict)
|
||||
and schema.get('prompt', False)
|
||||
# Check if we already have a value
|
||||
and (field_name not in resolved or not resolved[field_name])
|
||||
):
|
||||
fields_to_prompt.append(field_name)
|
||||
|
||||
# Sort for stability (or maybe define priority in config later?)
|
||||
fields_to_prompt.sort()
|
||||
|
||||
# 4. Prompt Loop
|
||||
try:
|
||||
for field in fields_to_prompt:
|
||||
self._prompt_user(field, resolved)
|
||||
except KeyboardInterrupt:
|
||||
print("\nInterrupted. Exiting.")
|
||||
sys.exit(1)
|
||||
|
||||
# Remove any fields that are still None/Empty (optional, but cleaner)
|
||||
# Also cast values to string
|
||||
return {k: str(v) for k, v in resolved.items() if v}
|
||||
|
||||
def _prompt_user(
|
||||
self, field_name: str, resolved_dict: dict[str, Any]
|
||||
) -> None:
|
||||
"""
|
||||
Helper to prompt a single field.
|
||||
"""
|
||||
schema = self.mappings.get(field_name, {})
|
||||
help_text = (
|
||||
schema.get('help', field_name)
|
||||
if isinstance(schema, dict)
|
||||
else field_name
|
||||
)
|
||||
|
||||
# We capitalize the field name for the prompt label if help text matches
|
||||
# name
|
||||
label = help_text
|
||||
|
||||
val = input(f"{label} (Optional): ").strip()
|
||||
if val:
|
||||
resolved_dict[field_name] = val
|
||||
17
uv.lock
generated
17
uv.lock
generated
@@ -1,13 +1,14 @@
|
||||
version = 1
|
||||
revision = 3
|
||||
requires-python = ">=3.10"
|
||||
|
||||
[[package]]
|
||||
name = "about-time"
|
||||
version = "4.2.1"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/1c/3f/ccb16bdc53ebb81c1bf837c1ee4b5b0b69584fd2e4a802a2a79936691c0a/about-time-4.2.1.tar.gz", hash = "sha256:6a538862d33ce67d997429d14998310e1dbfda6cb7d9bbfbf799c4709847fece", size = 15380 }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/1c/3f/ccb16bdc53ebb81c1bf837c1ee4b5b0b69584fd2e4a802a2a79936691c0a/about-time-4.2.1.tar.gz", hash = "sha256:6a538862d33ce67d997429d14998310e1dbfda6cb7d9bbfbf799c4709847fece", size = 15380, upload-time = "2022-12-21T04:15:54.991Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/fb/cd/7ee00d6aa023b1d0551da0da5fee3bc23c3eeea632fbfc5126d1fec52b7e/about_time-4.2.1-py3-none-any.whl", hash = "sha256:8bbf4c75fe13cbd3d72f49a03b02c5c7dca32169b6d49117c257e7eb3eaee341", size = 13295 },
|
||||
{ url = "https://files.pythonhosted.org/packages/fb/cd/7ee00d6aa023b1d0551da0da5fee3bc23c3eeea632fbfc5126d1fec52b7e/about_time-4.2.1-py3-none-any.whl", hash = "sha256:8bbf4c75fe13cbd3d72f49a03b02c5c7dca32169b6d49117c257e7eb3eaee341", size = 13295, upload-time = "2022-12-21T04:15:53.613Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -18,14 +19,14 @@ dependencies = [
|
||||
{ name = "about-time" },
|
||||
{ name = "grapheme" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/28/66/c2c1e6674b3b7202ce529cf7d9971c93031e843b8e0c86a85f693e6185b8/alive-progress-3.2.0.tar.gz", hash = "sha256:ede29d046ff454fe56b941f686f89dd9389430c4a5b7658e445cb0b80e0e4deb", size = 113231 }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/28/66/c2c1e6674b3b7202ce529cf7d9971c93031e843b8e0c86a85f693e6185b8/alive-progress-3.2.0.tar.gz", hash = "sha256:ede29d046ff454fe56b941f686f89dd9389430c4a5b7658e445cb0b80e0e4deb", size = 113231, upload-time = "2024-10-26T04:22:31.4Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/57/39/cade3a5a97fffa3ae84f298208237b3a9f7112d6b0ed57e8ff4b755e44b4/alive_progress-3.2.0-py3-none-any.whl", hash = "sha256:0677929f8d3202572e9d142f08170b34dbbe256cc6d2afbf75ef187c7da964a8", size = 77106 },
|
||||
{ url = "https://files.pythonhosted.org/packages/57/39/cade3a5a97fffa3ae84f298208237b3a9f7112d6b0ed57e8ff4b755e44b4/alive_progress-3.2.0-py3-none-any.whl", hash = "sha256:0677929f8d3202572e9d142f08170b34dbbe256cc6d2afbf75ef187c7da964a8", size = 77106, upload-time = "2024-10-26T04:22:29.103Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "emulsion"
|
||||
version = "0.1.0"
|
||||
version = "0.1.2"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "alive-progress" },
|
||||
@@ -42,13 +43,13 @@ requires-dist = [
|
||||
name = "grapheme"
|
||||
version = "0.6.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/ce/e7/bbaab0d2a33e07c8278910c1d0d8d4f3781293dfbc70b5c38197159046bf/grapheme-0.6.0.tar.gz", hash = "sha256:44c2b9f21bbe77cfb05835fec230bd435954275267fea1858013b102f8603cca", size = 207306 }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/ce/e7/bbaab0d2a33e07c8278910c1d0d8d4f3781293dfbc70b5c38197159046bf/grapheme-0.6.0.tar.gz", hash = "sha256:44c2b9f21bbe77cfb05835fec230bd435954275267fea1858013b102f8603cca", size = 207306, upload-time = "2020-03-07T17:13:55.492Z" }
|
||||
|
||||
[[package]]
|
||||
name = "toml"
|
||||
version = "0.10.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/be/ba/1f744cdc819428fc6b5084ec34d9b30660f6f9daaf70eead706e3203ec3c/toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f", size = 22253 }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/be/ba/1f744cdc819428fc6b5084ec34d9b30660f6f9daaf70eead706e3203ec3c/toml-0.10.2.tar.gz", hash = "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f", size = 22253, upload-time = "2020-11-01T01:40:22.204Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/44/6f/7120676b6d73228c96e17f1f794d8ab046fc910d781c8d151120c3f1569e/toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", size = 16588 },
|
||||
{ url = "https://files.pythonhosted.org/packages/44/6f/7120676b6d73228c96e17f1f794d8ab046fc910d781c8d151120c3f1569e/toml-0.10.2-py2.py3-none-any.whl", hash = "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", size = 16588, upload-time = "2020-11-01T01:40:20.672Z" },
|
||||
]
|
||||
|
||||
Reference in New Issue
Block a user