feat: Update update command to work with querysets

This commit is contained in:
Cristian
2020-08-22 08:59:25 -05:00
committed by Cristian Vargas
parent dafa1dd63c
commit f55153eab3
4 changed files with 84 additions and 56 deletions

View File

@@ -392,45 +392,50 @@ def snapshot_filter(snapshots: QuerySet, filter_patterns: List[str], filter_type
return snapshots.filter(q_filter)
def get_indexed_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
def get_indexed_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
"""indexed links without checking archive status or data directory validity"""
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
return {
link.link_dir: link
for link in links
}
def get_archived_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
def get_archived_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
"""indexed links that are archived with a valid data directory"""
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
return {
link.link_dir: link
for link in filter(is_archived, links)
}
def get_unarchived_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
def get_unarchived_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
"""indexed links that are unarchived with no data directory or an empty data directory"""
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
return {
link.link_dir: link
for link in filter(is_unarchived, links)
}
def get_present_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
def get_present_folders(_snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
"""dirs that actually exist in the archive/ folder"""
all_folders = {}
for entry in os.scandir(os.path.join(out_dir, ARCHIVE_DIR_NAME)):
if entry.is_dir(follow_symlinks=True):
for entry in (Path(out_dir) / ARCHIVE_DIR_NAME).iterdir():
if entry.is_dir():
link = None
try:
link = parse_json_link_details(entry.path)
except Exception:
pass
all_folders[entry.path] = link
all_folders[entry.name] = link
return all_folders
def get_valid_folders(links, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
def get_valid_folders(snapshots, out_dir: str=OUTPUT_DIR) -> Dict[str, Optional[Link]]:
"""dirs with a valid index matched to the main index and archived content"""
links = [snapshot.as_link() for snapshot in snapshots.iterator()]
return {
link.link_dir: link
for link in filter(is_valid, links)