From 357db6eca45243c6a53c7dc78b83f42f117dd8cb Mon Sep 17 00:00:00 2001 From: Cassowary Rusnov Date: Sun, 19 Dec 2021 22:02:47 -0800 Subject: [PATCH] Major additions to support JSON files and provide compile time options - Add file_json/get_file_json handling. This creates a new global template function to treat a file as a json file and returns a dict. - Add some tools for merging dictionaries. - Add command-line settable variables that get inserted into metadata tree so that at runtime options can be set. --- pixywerk2/__init__.py | 2 +- pixywerk2/__main__.py | 16 ++++++++++++++-- pixywerk2/processchain.py | 3 +++ pixywerk2/template_tools.py | 23 ++++++++++++++++++++++- pixywerk2/utils.py | 30 +++++++++++++++++++++++++++++- 5 files changed, 69 insertions(+), 5 deletions(-) diff --git a/pixywerk2/__init__.py b/pixywerk2/__init__.py index 0404d81..2b8877c 100644 --- a/pixywerk2/__init__.py +++ b/pixywerk2/__init__.py @@ -1 +1 @@ -__version__ = '0.3.0' +__version__ = '0.5.0' diff --git a/pixywerk2/__main__.py b/pixywerk2/__main__.py index 72f5cd0..d63fcbd 100644 --- a/pixywerk2/__main__.py +++ b/pixywerk2/__main__.py @@ -22,6 +22,7 @@ from .template_tools import ( file_content, file_list, file_list_hier, + file_json, file_metadata, file_name, file_raw, @@ -35,6 +36,12 @@ def setup_logging(verbose: bool = False) -> None: pass +def parse_var(varspec: str) -> List: + if (not ('=' in varspec)): + return [varspec, True] + return list(varspec.split('=', 2)) + + def get_args(args: List[str]) -> argparse.Namespace: parser = argparse.ArgumentParser("Compile a Pixywerk directory into an output directory.") @@ -50,8 +57,9 @@ def get_args(args: List[str]) -> argparse.Namespace: parser.add_argument("-d", "--dry-run", help="Perform a dry-run.", action="store_true") parser.add_argument("-v", "--verbose", help="Output verbosely.", action="store_true") parser.add_argument("--processors", help="Specify a path to a processor configuration file.", default=None) + parser.add_argument( + "-D", "--define", help="Add a variable to the metadata.", nargs="+", action="extend", type=parse_var) result = parser.parse_args(args) - # validate arguments if not os.path.isdir(result.root): raise FileNotFoundError("can't find root folder {}".format(result.root)) @@ -89,17 +97,21 @@ def main() -> int: "author": "", "author_email": "", } + for var in args.define: + default_metadata[var[0]] = var[1] meta_tree = MetaTree(args.root, default_metadata) file_list_cache = cast(Dict, {}) file_cont_cache = cast(Dict, {}) file_name_cache = cast(Dict, {}) file_raw_cache = cast(Dict, {}) + file_json_cache = cast(Dict, {}) flist = file_list(args.root, file_list_cache) default_metadata["globals"] = { "get_file_list": flist, "get_hier": file_list_hier(args.root, flist), "get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache), "get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache), + "get_json": file_json(args.root, file_json_cache), "get_raw": file_raw(args.root, file_raw_cache), "get_file_metadata": file_metadata(meta_tree), "get_time_iso8601": time_iso8601("UTC"), @@ -127,7 +139,7 @@ def main() -> int: continue metadata = meta_tree.get_metadata(os.path.join(workroot, f)) chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata) - print("process {} -> {}".format(os.path.join(root, f), os.path.join(target_dir, chain.output_filename))) + print("process {} -> {} -> {}".format(os.path.join(root, f), repr(chain), os.path.join(target_dir, chain.output_filename))) if not args.dry_run: try: with open(os.path.join(target_dir, chain.output_filename), "w") as outfile: diff --git a/pixywerk2/processchain.py b/pixywerk2/processchain.py index 5124809..cc1e98c 100644 --- a/pixywerk2/processchain.py +++ b/pixywerk2/processchain.py @@ -90,6 +90,9 @@ class ProcessorChain: fname = processor.filename(fname, self._ctx) return fname + def __repr__(self) -> str: + return "[" + ",".join([x.__class__.__name__ for x in self._processors]) + "]" + class ProcessorChains: """Load a configuration for processor chains, and provide ability to process the chains given a particular input diff --git a/pixywerk2/template_tools.py b/pixywerk2/template_tools.py index fff688a..328145e 100644 --- a/pixywerk2/template_tools.py +++ b/pixywerk2/template_tools.py @@ -1,17 +1,26 @@ +import copy import datetime import glob import itertools import os from typing import Callable, Dict, Iterable, List, Union, cast, Tuple +import jstyleson + import pytz from .metadata import MetaTree from .processchain import ProcessorChains +from .utils import deep_merge_dicts def file_list(root: str, listcache: Dict) -> Callable: - def get_file_list(path_glob: Union[str, List[str], Tuple[str]], *, sort_order: str = "ctime", reverse: bool = False, limit: int = 0) -> Iterable: + def get_file_list( + path_glob: Union[str, List[str], Tuple[str]], + *, + sort_order: str = "ctime", + reverse: bool = False, + limit: int = 0) -> Iterable: stattable = cast(List, []) if isinstance(path_glob, str): path_glob = [path_glob] @@ -87,6 +96,18 @@ def file_raw(root: str, contcache: Dict) -> Callable: return get_raw +def file_json(root: str) -> Callable: + def get_json(file_name: str, parent: Dict = None) -> Dict: + outd = {} + if parent is not None: + outd = copy.deepcopy(parent) + + with open(os.path.join(root, file_name), "r", encoding="utf-8") as f: + return deep_merge_dicts(outd, jstyleson.load(f)) + + return get_json + + def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable: def get_file_content(file_name: str) -> Iterable: if file_name in contcache: diff --git a/pixywerk2/utils.py b/pixywerk2/utils.py index 2f16f66..16e8e10 100644 --- a/pixywerk2/utils.py +++ b/pixywerk2/utils.py @@ -4,7 +4,7 @@ from typing import Dict, Optional def merge_dicts(dict_a: Dict, dict_b: Dict) -> Dict: - """Merge two dictionaries. + """Merge two dictionaries (shallow). Arguments: dict_a (dict): The dictionary to use as the base. @@ -19,6 +19,34 @@ def merge_dicts(dict_a: Dict, dict_b: Dict) -> Dict: return dict_z +def deep_merge_dicts(dict_a: Dict, dict_b: Dict, _path=None) -> Dict: + """Merge two dictionaries (deep). + https://stackoverflow.com/questions/7204805/how-to-merge-dictionaries-of-dictionaries/7205107#7205107 + + Arguments: + dict_a (dict): The dictionary to use as the base. + dict_b (dict): The dictionary to update the values with. + _path (list): internal use. + + Returns: + dict: A new merged dictionary. + + """ + if _path is None: + _path = [] + for key in dict_b: + if key in dict_a: + if isinstance(dict_a[key], dict) and isinstance(dict_b[key], dict): + deep_merge_dicts(dict_a[key], dict_b[key], _path + [str(key)]) + elif dict_a[key] == dict_b[key]: + pass # same leaf value + else: + raise Exception('Conflict at %s' % '.'.join(_path + [str(key)])) + else: + dict_a[key] = dict_b[key] + return dict_a + + def guess_mime(path: str) -> Optional[str]: """Guess the mime type for a given path.