Major additions to support JSON files and provide compile time options

- Add file_json/get_file_json handling.
  This creates a new global template function to treat a file as a
  json file and returns a dict.

- Add some tools for merging dictionaries.

- Add command-line settable variables that get inserted into metadata
  tree so that at runtime options can be set.
This commit is contained in:
Cassowary 2021-12-19 22:02:47 -08:00
parent 4780764a60
commit 357db6eca4
5 changed files with 69 additions and 5 deletions

View File

@ -1 +1 @@
__version__ = '0.3.0' __version__ = '0.5.0'

View File

@ -22,6 +22,7 @@ from .template_tools import (
file_content, file_content,
file_list, file_list,
file_list_hier, file_list_hier,
file_json,
file_metadata, file_metadata,
file_name, file_name,
file_raw, file_raw,
@ -35,6 +36,12 @@ def setup_logging(verbose: bool = False) -> None:
pass pass
def parse_var(varspec: str) -> List:
if (not ('=' in varspec)):
return [varspec, True]
return list(varspec.split('=', 2))
def get_args(args: List[str]) -> argparse.Namespace: def get_args(args: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser("Compile a Pixywerk directory into an output directory.") parser = argparse.ArgumentParser("Compile a Pixywerk directory into an output directory.")
@ -50,8 +57,9 @@ def get_args(args: List[str]) -> argparse.Namespace:
parser.add_argument("-d", "--dry-run", help="Perform a dry-run.", action="store_true") parser.add_argument("-d", "--dry-run", help="Perform a dry-run.", action="store_true")
parser.add_argument("-v", "--verbose", help="Output verbosely.", action="store_true") parser.add_argument("-v", "--verbose", help="Output verbosely.", action="store_true")
parser.add_argument("--processors", help="Specify a path to a processor configuration file.", default=None) parser.add_argument("--processors", help="Specify a path to a processor configuration file.", default=None)
parser.add_argument(
"-D", "--define", help="Add a variable to the metadata.", nargs="+", action="extend", type=parse_var)
result = parser.parse_args(args) result = parser.parse_args(args)
# validate arguments # validate arguments
if not os.path.isdir(result.root): if not os.path.isdir(result.root):
raise FileNotFoundError("can't find root folder {}".format(result.root)) raise FileNotFoundError("can't find root folder {}".format(result.root))
@ -89,17 +97,21 @@ def main() -> int:
"author": "", "author": "",
"author_email": "", "author_email": "",
} }
for var in args.define:
default_metadata[var[0]] = var[1]
meta_tree = MetaTree(args.root, default_metadata) meta_tree = MetaTree(args.root, default_metadata)
file_list_cache = cast(Dict, {}) file_list_cache = cast(Dict, {})
file_cont_cache = cast(Dict, {}) file_cont_cache = cast(Dict, {})
file_name_cache = cast(Dict, {}) file_name_cache = cast(Dict, {})
file_raw_cache = cast(Dict, {}) file_raw_cache = cast(Dict, {})
file_json_cache = cast(Dict, {})
flist = file_list(args.root, file_list_cache) flist = file_list(args.root, file_list_cache)
default_metadata["globals"] = { default_metadata["globals"] = {
"get_file_list": flist, "get_file_list": flist,
"get_hier": file_list_hier(args.root, flist), "get_hier": file_list_hier(args.root, flist),
"get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache), "get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache),
"get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache), "get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache),
"get_json": file_json(args.root, file_json_cache),
"get_raw": file_raw(args.root, file_raw_cache), "get_raw": file_raw(args.root, file_raw_cache),
"get_file_metadata": file_metadata(meta_tree), "get_file_metadata": file_metadata(meta_tree),
"get_time_iso8601": time_iso8601("UTC"), "get_time_iso8601": time_iso8601("UTC"),
@ -127,7 +139,7 @@ def main() -> int:
continue continue
metadata = meta_tree.get_metadata(os.path.join(workroot, f)) metadata = meta_tree.get_metadata(os.path.join(workroot, f))
chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata) chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata)
print("process {} -> {}".format(os.path.join(root, f), os.path.join(target_dir, chain.output_filename))) print("process {} -> {} -> {}".format(os.path.join(root, f), repr(chain), os.path.join(target_dir, chain.output_filename)))
if not args.dry_run: if not args.dry_run:
try: try:
with open(os.path.join(target_dir, chain.output_filename), "w") as outfile: with open(os.path.join(target_dir, chain.output_filename), "w") as outfile:

View File

@ -90,6 +90,9 @@ class ProcessorChain:
fname = processor.filename(fname, self._ctx) fname = processor.filename(fname, self._ctx)
return fname return fname
def __repr__(self) -> str:
return "[" + ",".join([x.__class__.__name__ for x in self._processors]) + "]"
class ProcessorChains: class ProcessorChains:
"""Load a configuration for processor chains, and provide ability to process the chains given a particular input """Load a configuration for processor chains, and provide ability to process the chains given a particular input

View File

@ -1,17 +1,26 @@
import copy
import datetime import datetime
import glob import glob
import itertools import itertools
import os import os
from typing import Callable, Dict, Iterable, List, Union, cast, Tuple from typing import Callable, Dict, Iterable, List, Union, cast, Tuple
import jstyleson
import pytz import pytz
from .metadata import MetaTree from .metadata import MetaTree
from .processchain import ProcessorChains from .processchain import ProcessorChains
from .utils import deep_merge_dicts
def file_list(root: str, listcache: Dict) -> Callable: def file_list(root: str, listcache: Dict) -> Callable:
def get_file_list(path_glob: Union[str, List[str], Tuple[str]], *, sort_order: str = "ctime", reverse: bool = False, limit: int = 0) -> Iterable: def get_file_list(
path_glob: Union[str, List[str], Tuple[str]],
*,
sort_order: str = "ctime",
reverse: bool = False,
limit: int = 0) -> Iterable:
stattable = cast(List, []) stattable = cast(List, [])
if isinstance(path_glob, str): if isinstance(path_glob, str):
path_glob = [path_glob] path_glob = [path_glob]
@ -87,6 +96,18 @@ def file_raw(root: str, contcache: Dict) -> Callable:
return get_raw return get_raw
def file_json(root: str) -> Callable:
def get_json(file_name: str, parent: Dict = None) -> Dict:
outd = {}
if parent is not None:
outd = copy.deepcopy(parent)
with open(os.path.join(root, file_name), "r", encoding="utf-8") as f:
return deep_merge_dicts(outd, jstyleson.load(f))
return get_json
def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable: def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable:
def get_file_content(file_name: str) -> Iterable: def get_file_content(file_name: str) -> Iterable:
if file_name in contcache: if file_name in contcache:

View File

@ -4,7 +4,7 @@ from typing import Dict, Optional
def merge_dicts(dict_a: Dict, dict_b: Dict) -> Dict: def merge_dicts(dict_a: Dict, dict_b: Dict) -> Dict:
"""Merge two dictionaries. """Merge two dictionaries (shallow).
Arguments: Arguments:
dict_a (dict): The dictionary to use as the base. dict_a (dict): The dictionary to use as the base.
@ -19,6 +19,34 @@ def merge_dicts(dict_a: Dict, dict_b: Dict) -> Dict:
return dict_z return dict_z
def deep_merge_dicts(dict_a: Dict, dict_b: Dict, _path=None) -> Dict:
"""Merge two dictionaries (deep).
https://stackoverflow.com/questions/7204805/how-to-merge-dictionaries-of-dictionaries/7205107#7205107
Arguments:
dict_a (dict): The dictionary to use as the base.
dict_b (dict): The dictionary to update the values with.
_path (list): internal use.
Returns:
dict: A new merged dictionary.
"""
if _path is None:
_path = []
for key in dict_b:
if key in dict_a:
if isinstance(dict_a[key], dict) and isinstance(dict_b[key], dict):
deep_merge_dicts(dict_a[key], dict_b[key], _path + [str(key)])
elif dict_a[key] == dict_b[key]:
pass # same leaf value
else:
raise Exception('Conflict at %s' % '.'.join(_path + [str(key)]))
else:
dict_a[key] = dict_b[key]
return dict_a
def guess_mime(path: str) -> Optional[str]: def guess_mime(path: str) -> Optional[str]:
"""Guess the mime type for a given path. """Guess the mime type for a given path.