mirror of
https://github.com/ArchiveBox/ArchiveBox.git
synced 2026-04-05 23:37:58 +10:00
move system.py into misc folder
This commit is contained in:
260
archivebox/misc/system.py
Normal file
260
archivebox/misc/system.py
Normal file
@@ -0,0 +1,260 @@
|
||||
__package__ = 'archivebox'
|
||||
|
||||
|
||||
import os
|
||||
import signal
|
||||
import shutil
|
||||
import getpass
|
||||
|
||||
from json import dump
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union, Set, Tuple
|
||||
from subprocess import _mswindows, PIPE, Popen, CalledProcessError, CompletedProcess, TimeoutExpired
|
||||
|
||||
from crontab import CronTab
|
||||
from atomicwrites import atomic_write as lib_atomic_write
|
||||
|
||||
from .util import enforce_types, ExtendedEncoder
|
||||
from .config.legacy import OUTPUT_PERMISSIONS, DIR_OUTPUT_PERMISSIONS, ENFORCE_ATOMIC_WRITES
|
||||
|
||||
|
||||
def run(cmd, *args, input=None, capture_output=True, timeout=None, check=False, text=False, start_new_session=True, **kwargs):
|
||||
"""Patched of subprocess.run to kill forked child subprocesses and fix blocking io making timeout=innefective
|
||||
Mostly copied from https://github.com/python/cpython/blob/master/Lib/subprocess.py
|
||||
"""
|
||||
|
||||
cmd = [str(arg) for arg in cmd]
|
||||
|
||||
if input is not None:
|
||||
if kwargs.get('stdin') is not None:
|
||||
raise ValueError('stdin and input arguments may not both be used.')
|
||||
kwargs['stdin'] = PIPE
|
||||
|
||||
if capture_output:
|
||||
if ('stdout' in kwargs) or ('stderr' in kwargs):
|
||||
raise ValueError('stdout and stderr arguments may not be used with capture_output.')
|
||||
kwargs['stdout'] = PIPE
|
||||
kwargs['stderr'] = PIPE
|
||||
|
||||
pgid = None
|
||||
try:
|
||||
if isinstance(cmd, (list, tuple)) and cmd[0].endswith('.py'):
|
||||
PYTHON_BINARY = sys.executable
|
||||
cmd = (PYTHON_BINARY, *cmd)
|
||||
|
||||
with Popen(cmd, *args, start_new_session=start_new_session, text=text, **kwargs) as process:
|
||||
pgid = os.getpgid(process.pid)
|
||||
try:
|
||||
stdout, stderr = process.communicate(input, timeout=timeout)
|
||||
except TimeoutExpired as exc:
|
||||
process.kill()
|
||||
if _mswindows:
|
||||
# Windows accumulates the output in a single blocking
|
||||
# read() call run on child threads, with the timeout
|
||||
# being done in a join() on those threads. communicate()
|
||||
# _after_ kill() is required to collect that and add it
|
||||
# to the exception.
|
||||
exc.stdout, exc.stderr = process.communicate()
|
||||
else:
|
||||
# POSIX _communicate already populated the output so
|
||||
# far into the TimeoutExpired exception.
|
||||
process.wait()
|
||||
raise
|
||||
except: # Including KeyboardInterrupt, communicate handled that.
|
||||
process.kill()
|
||||
# We don't call process.wait() as .__exit__ does that for us.
|
||||
raise
|
||||
|
||||
retcode = process.poll()
|
||||
if check and retcode:
|
||||
raise CalledProcessError(retcode, process.args,
|
||||
output=stdout, stderr=stderr)
|
||||
finally:
|
||||
# force kill any straggler subprocesses that were forked from the main proc
|
||||
try:
|
||||
os.killpg(pgid, signal.SIGINT)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return CompletedProcess(process.args, retcode, stdout, stderr)
|
||||
|
||||
|
||||
@enforce_types
|
||||
def atomic_write(path: Union[Path, str], contents: Union[dict, str, bytes], overwrite: bool=True) -> None:
|
||||
"""Safe atomic write to filesystem by writing to temp file + atomic rename"""
|
||||
|
||||
mode = 'wb+' if isinstance(contents, bytes) else 'w'
|
||||
encoding = None if isinstance(contents, bytes) else 'utf-8' # enforce utf-8 on all text writes
|
||||
|
||||
# print('\n> Atomic Write:', mode, path, len(contents), f'overwrite={overwrite}')
|
||||
try:
|
||||
with lib_atomic_write(path, mode=mode, overwrite=overwrite, encoding=encoding) as f:
|
||||
if isinstance(contents, dict):
|
||||
dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
|
||||
elif isinstance(contents, (bytes, str)):
|
||||
f.write(contents)
|
||||
except OSError as e:
|
||||
if ENFORCE_ATOMIC_WRITES:
|
||||
print(f"[X] OSError: Failed to write {path} with fcntl.F_FULLFSYNC. ({e})")
|
||||
print(" You can store the archive/ subfolder on a hard drive or network share that doesn't support support syncronous writes,")
|
||||
print(" but the main folder containing the index.sqlite3 and ArchiveBox.conf files must be on a filesystem that supports FSYNC.")
|
||||
raise SystemExit(1)
|
||||
|
||||
# retry the write without forcing FSYNC (aka atomic mode)
|
||||
with open(path, mode=mode, encoding=encoding) as f:
|
||||
if isinstance(contents, dict):
|
||||
dump(contents, f, indent=4, sort_keys=True, cls=ExtendedEncoder)
|
||||
elif isinstance(contents, (bytes, str)):
|
||||
f.write(contents)
|
||||
|
||||
# set file permissions
|
||||
os.chmod(path, int(OUTPUT_PERMISSIONS, base=8))
|
||||
|
||||
@enforce_types
|
||||
def chmod_file(path: str, cwd: str='.') -> None:
|
||||
"""chmod -R <permissions> <cwd>/<path>"""
|
||||
|
||||
root = Path(cwd) / path
|
||||
if not root.exists():
|
||||
raise Exception('Failed to chmod: {} does not exist (did the previous step fail?)'.format(path))
|
||||
|
||||
if not root.is_dir():
|
||||
# path is just a plain file
|
||||
os.chmod(root, int(OUTPUT_PERMISSIONS, base=8))
|
||||
else:
|
||||
for subpath in Path(path).glob('**/*'):
|
||||
if subpath.is_dir():
|
||||
# directories need execute permissions to be able to list contents
|
||||
os.chmod(subpath, int(DIR_OUTPUT_PERMISSIONS, base=8))
|
||||
else:
|
||||
os.chmod(subpath, int(OUTPUT_PERMISSIONS, base=8))
|
||||
|
||||
|
||||
@enforce_types
|
||||
def copy_and_overwrite(from_path: Union[str, Path], to_path: Union[str, Path]):
|
||||
"""copy a given file or directory to a given path, overwriting the destination"""
|
||||
if Path(from_path).is_dir():
|
||||
shutil.rmtree(to_path, ignore_errors=True)
|
||||
shutil.copytree(from_path, to_path)
|
||||
else:
|
||||
with open(from_path, 'rb') as src:
|
||||
contents = src.read()
|
||||
atomic_write(to_path, contents)
|
||||
|
||||
|
||||
@enforce_types
|
||||
def get_dir_size(path: Union[str, Path], recursive: bool=True, pattern: Optional[str]=None) -> Tuple[int, int, int]:
|
||||
"""get the total disk size of a given directory, optionally summing up
|
||||
recursively and limiting to a given filter list
|
||||
"""
|
||||
num_bytes, num_dirs, num_files = 0, 0, 0
|
||||
try:
|
||||
for entry in os.scandir(path):
|
||||
if (pattern is not None) and (pattern not in entry.path):
|
||||
continue
|
||||
if entry.is_dir(follow_symlinks=False):
|
||||
if not recursive:
|
||||
continue
|
||||
num_dirs += 1
|
||||
bytes_inside, dirs_inside, files_inside = get_dir_size(entry.path)
|
||||
num_bytes += bytes_inside
|
||||
num_dirs += dirs_inside
|
||||
num_files += files_inside
|
||||
else:
|
||||
num_bytes += entry.stat(follow_symlinks=False).st_size
|
||||
num_files += 1
|
||||
except OSError:
|
||||
# e.g. FileNameTooLong or other error while trying to read dir
|
||||
pass
|
||||
return num_bytes, num_dirs, num_files
|
||||
|
||||
|
||||
CRON_COMMENT = 'archivebox_schedule'
|
||||
|
||||
|
||||
@enforce_types
|
||||
def dedupe_cron_jobs(cron: CronTab) -> CronTab:
|
||||
deduped: Set[Tuple[str, str]] = set()
|
||||
|
||||
for job in list(cron):
|
||||
unique_tuple = (str(job.slices), str(job.command))
|
||||
if unique_tuple not in deduped:
|
||||
deduped.add(unique_tuple)
|
||||
cron.remove(job)
|
||||
|
||||
for schedule, command in deduped:
|
||||
job = cron.new(command=command, comment=CRON_COMMENT)
|
||||
job.setall(schedule)
|
||||
job.enable()
|
||||
|
||||
return cron
|
||||
|
||||
|
||||
class suppress_output(object):
|
||||
"""
|
||||
A context manager for doing a "deep suppression" of stdout and stderr in
|
||||
Python, i.e. will suppress all print, even if the print originates in a
|
||||
compiled C/Fortran sub-function.
|
||||
|
||||
This will not suppress raised exceptions, since exceptions are printed
|
||||
to stderr just before a script exits, and after the context manager has
|
||||
exited (at least, I think that is why it lets exceptions through).
|
||||
|
||||
with suppress_stdout_stderr():
|
||||
rogue_function()
|
||||
"""
|
||||
|
||||
def __init__(self, stdout=True, stderr=True):
|
||||
# Open a pair of null files
|
||||
# Save the actual stdout (1) and stderr (2) file descriptors.
|
||||
self.stdout, self.stderr = stdout, stderr
|
||||
if stdout:
|
||||
self.null_stdout = os.open(os.devnull, os.O_RDWR)
|
||||
self.real_stdout = os.dup(1)
|
||||
if stderr:
|
||||
self.null_stderr = os.open(os.devnull, os.O_RDWR)
|
||||
self.real_stderr = os.dup(2)
|
||||
|
||||
def __enter__(self):
|
||||
# Assign the null pointers to stdout and stderr.
|
||||
if self.stdout:
|
||||
os.dup2(self.null_stdout, 1)
|
||||
if self.stderr:
|
||||
os.dup2(self.null_stderr, 2)
|
||||
|
||||
def __exit__(self, *_):
|
||||
# Re-assign the real stdout/stderr back to (1) and (2)
|
||||
if self.stdout:
|
||||
os.dup2(self.real_stdout, 1)
|
||||
os.close(self.null_stdout)
|
||||
if self.stderr:
|
||||
os.dup2(self.real_stderr, 2)
|
||||
os.close(self.null_stderr)
|
||||
|
||||
|
||||
def get_system_user() -> str:
|
||||
# some host OS's are unable to provide a username (k3s, Windows), making this complicated
|
||||
# uid 999 is especially problematic and breaks many attempts
|
||||
SYSTEM_USER = None
|
||||
FALLBACK_USER_PLACHOLDER = f'user_{os.getuid()}'
|
||||
|
||||
# Option 1
|
||||
try:
|
||||
import pwd
|
||||
SYSTEM_USER = SYSTEM_USER or pwd.getpwuid(os.geteuid()).pw_name
|
||||
except (ModuleNotFoundError, Exception):
|
||||
pass
|
||||
|
||||
# Option 2
|
||||
try:
|
||||
SYSTEM_USER = SYSTEM_USER or getpass.getuser()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Option 3
|
||||
try:
|
||||
SYSTEM_USER = SYSTEM_USER or os.getlogin()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return SYSTEM_USER or FALLBACK_USER_PLACHOLDER
|
||||
@@ -2,7 +2,7 @@ __package__ = 'abx.archivebox'
|
||||
|
||||
from django.test import TestCase
|
||||
|
||||
from .ini_to_toml import convert, TOML_HEADER
|
||||
from .toml_util import convert, TOML_HEADER
|
||||
|
||||
TEST_INPUT = """
|
||||
[SERVER_CONFIG]
|
||||
|
||||
@@ -3,8 +3,12 @@ from typing import Any, List, Callable
|
||||
import json
|
||||
import ast
|
||||
import inspect
|
||||
import toml
|
||||
import re
|
||||
import configparser
|
||||
|
||||
from pathlib import Path, PosixPath
|
||||
|
||||
from pydantic.json_schema import GenerateJsonSchema
|
||||
from pydantic_core import to_jsonable_python
|
||||
|
||||
@@ -68,8 +72,12 @@ def convert(ini_str: str) -> str:
|
||||
|
||||
|
||||
class JSONSchemaWithLambdas(GenerateJsonSchema):
|
||||
"""
|
||||
Encode lambda functions in default values properly.
|
||||
Usage:
|
||||
>>> json.dumps(value, encoder=JSONSchemaWithLambdas())
|
||||
"""
|
||||
def encode_default(self, default: Any) -> Any:
|
||||
"""Encode lambda functions in default values properly"""
|
||||
config = self._config
|
||||
if isinstance(default, Callable):
|
||||
return '{{lambda ' + inspect.getsource(default).split('=lambda ')[-1].strip()[:-1] + '}}'
|
||||
@@ -83,3 +91,24 @@ class JSONSchemaWithLambdas(GenerateJsonSchema):
|
||||
# for computed_field properties render them like this instead:
|
||||
# inspect.getsource(field.wrapped_property.fget).split('def ', 1)[-1].split('\n', 1)[-1].strip().strip('return '),
|
||||
|
||||
|
||||
def better_toml_dump_str(val: Any) -> str:
|
||||
try:
|
||||
return toml.encoder._dump_str(val) # type: ignore
|
||||
except Exception:
|
||||
# if we hit any of toml's numerous encoding bugs,
|
||||
# fall back to using json representation of string
|
||||
return json.dumps(str(val))
|
||||
|
||||
class CustomTOMLEncoder(toml.encoder.TomlEncoder):
|
||||
"""
|
||||
Custom TomlEncoder to work around https://github.com/uiri/toml's many encoding bugs.
|
||||
More info: https://github.com/fabiocaccamo/python-benedict/issues/439
|
||||
>>> toml.dumps(value, encoder=CustomTOMLEncoder())
|
||||
"""
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.dump_funcs[Path] = lambda x: json.dumps(str(x))
|
||||
self.dump_funcs[PosixPath] = lambda x: json.dumps(str(x))
|
||||
self.dump_funcs[str] = better_toml_dump_str
|
||||
self.dump_funcs[re.RegexFlag] = better_toml_dump_str
|
||||
Reference in New Issue
Block a user