Major cleanup and fixage for new metadata stuff and heckformat

- Clean up a ton of documentation.
- Make the modules import nicely.
- Add a cool logo to the command line tool
- Make the command-line tool use tqdm
- Make the command line tool load the metadata before processing the
  files in a separate loop.
- Fix error handling in the command-line tool processing loops so they
  work correctly (and jinja errors are more useful)
- Make command-line tool exit non-zero if there were errors.
- Fix load metadata to handle formats and errors better (and return {}
  if it fails)
This commit is contained in:
Cassowary 2024-02-27 21:50:03 -08:00
parent b389506b4b
commit 690f110bc5
9 changed files with 258 additions and 53 deletions

View File

@ -20,6 +20,12 @@
* Run commands as part of processing chains * Run commands as part of processing chains
* Project level processing chain overrides in the .meta or whatever. * Project level processing chain overrides in the .meta or whatever.
* Project settings in separate file from .meta that would basically do .meta stuff. Like global meta + config in a top.heck file by default and overridable by a parameter. * Project settings in separate file from .meta that would basically do .meta stuff. Like global meta + config in a top.heck file by default and overridable by a parameter. Maybe
a nice default filename that doesn't start with . (whereas .meta or .heck is the current base metadata)
* Handle the fact that HECKformat metadata is always a list in a more elegent way. We have some hacks in place where scalar values are expected, but if a project mixes
metadata formats it gets a bit messy.
* Provide a database-like interface to the global metadata tree, so that metameta page templates could query the database to assemble indexes and whatnot without looping over the actual files.

View File

@ -1 +1,14 @@
__version__ = '0.7.0' """
HeckWeasel: Metadata based static site compiler.
"""
__version__ = '0.7.1'
__copyright__ = "©2023-2024 Aldercone Studio Collective"
from . import metadata
from . import processors
from . import __main__
from . import processchain
from . import processors
from . import template_tools
from . import pygments
from . import utils

View File

@ -1,3 +1,9 @@
"""
HeckWeasel command line interface.
Performs compilation step given an input directory. See --help for more information.
"""
# iterate source tree # iterate source tree
# create directors in target tree # create directors in target tree
# for each item: # for each item:
@ -11,7 +17,13 @@ import os
import shutil import shutil
import sys import sys
import time import time
from typing import Dict, List, cast
import jinja2.exceptions
from pathlib import Path
from typing import Dict, List, cast, Union
import tqdm
from .metadata import MetaTree from .metadata import MetaTree
from .processchain import ProcessorChains from .processchain import ProcessorChains
@ -23,19 +35,69 @@ from .template_tools import (
file_list, file_list,
file_list_hier, file_list_hier,
file_json, file_json,
file_heck,
file_metadata, file_metadata,
file_name, file_name,
file_raw, file_raw,
time_iso8601, time_iso8601,
containsone,
) )
from .utils import deep_merge_dicts from .utils import deep_merge_dicts
from .__init__ import __version__, __copyright__
logger = logging.getLogger() logger = logging.getLogger('heckweasel')
logo = f"""
Aldercone Studio Collective
_ _ _
| |_ ___ __| |____ __ _____ __ _ ___ ___| |
| ' \/ -_) _| / /\ V V / -_) _` (_-</ -_) |
|_||_\___\__|_\_\ \_/\_/\___\__,_/__/\___|_|
{__version__}
"""
def setup_logging(verbose: bool = False) -> None: class TqdmLoggingHandler(logging.Handler):
pass """
A simple logging wrapper that won't clobber TQDM's progress bar.
"""
def __init__(self, level=logging.NOTSET):
super().__init__(level)
def emit(self, record):
try:
msg = self.format(record)
tqdm.tqdm.write(msg)
self.flush()
except Exception:
self.handleError(record)
def setup_logging(verbose:bool=False, quiet:bool=False, logfile:Union[Path, str, None]=None) -> None:
"""
Configure logging based on some flags.
"""
# Setup Tqdm handler
logger.setLevel(logging.DEBUG)
h = TqdmLoggingHandler()
if verbose:
f = logging.Formatter('%(asctime)s %(module)-12s %(levelname)-8s %(message)s')
h.setLevel(logging.DEBUG)
h.setFormatter(f)
elif quiet:
f = logging.Formatter('%(levelname)-8s %(message)s')
h.setLevel(logging.CRITICAL)
h.setFormatter(f)
else:
f = logging.Formatter('%(levelname)-8s %(message)s')
h.setLevel(logging.INFO)
h.setFormatter(f)
logger.addHandler(h)
# setup logfile if specified
if logfile:
lf = logging.FileHandler(logfile)
lf.setLevel(logging.DEBUG)
lf.setFormatter(logging.Formatter('%(asctime)s %(module)-12s %(levelname)-8s %(message)s'))
logger.addHandler(lf)
def parse_var(varspec: str) -> List: def parse_var(varspec: str) -> List:
if (not ('=' in varspec)): if (not ('=' in varspec)):
@ -73,15 +135,17 @@ def get_args(args: List[str]) -> argparse.Namespace:
def main() -> int: def main() -> int:
print(logo)
try: try:
args = get_args(sys.argv[1:]) args = get_args(sys.argv[1:])
except FileNotFoundError as ex: except FileNotFoundError as ex:
print("error finding arguments: {}".format(ex)) logger.info("error finding arguments: {}".format(ex))
return 1 return 1
setup_logging(args.verbose) setup_logging(args.verbose)
if os.path.exists(args.output) and args.clean: if os.path.exists(args.output) and args.clean:
bak = "{}.bak-{}".format(args.output, int(time.time())) bak = "{}.bak-{}".format(args.output, int(time.time()))
print("cleaning target {} -> {}".format(args.output, bak)) logger.info("cleaning target {} -> {}".format(args.output, bak))
os.rename(args.output, bak) os.rename(args.output, bak)
process_chains = ProcessorChains(args.processors) process_chains = ProcessorChains(args.processors)
@ -98,21 +162,26 @@ def main() -> int:
"author": "", "author": "",
"author_email": "", "author_email": "",
} }
if args.define: if args.define:
for var in args.define: for var in args.define:
default_metadata[var[0]] = var[1] default_metadata[var[0]] = var[1]
meta_tree = MetaTree(args.root, default_metadata) meta_tree = MetaTree(args.root, default_metadata)
file_list_cache = cast(Dict, {}) file_list_cache = cast(Dict, {})
file_cont_cache = cast(Dict, {}) file_cont_cache = cast(Dict, {})
file_name_cache = cast(Dict, {}) file_name_cache = cast(Dict, {})
file_raw_cache = cast(Dict, {}) file_raw_cache = cast(Dict, {})
flist = file_list(args.root, file_list_cache) flist = file_list(args.root, file_list_cache)
default_metadata["globals"] = { default_metadata["globals"] = {
"get_file_list": flist, "get_file_list": flist,
"get_hier": file_list_hier(args.root, flist), "get_hier": file_list_hier(args.root, flist),
"get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache), "get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache),
"get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache), "get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache),
"get_json": file_json(args.root), "get_json": file_json(args.root),
"get_heck": file_heck(args.root),
"get_raw": file_raw(args.root, file_raw_cache), "get_raw": file_raw(args.root, file_raw_cache),
"get_file_metadata": file_metadata(meta_tree), "get_file_metadata": file_metadata(meta_tree),
"get_time_iso8601": time_iso8601("UTC"), "get_time_iso8601": time_iso8601("UTC"),
@ -120,43 +189,87 @@ def main() -> int:
"pygments_get_css": pygments_get_css, "pygments_get_css": pygments_get_css,
"pygments_markup_contents_html": pygments_markup_contents_html, "pygments_markup_contents_html": pygments_markup_contents_html,
"merge_dicts": deep_merge_dicts, "merge_dicts": deep_merge_dicts,
"containsone": containsone,
} }
# fixme add no-progress option for loop just to be the files
md = {}
haderrors = False
logger.info("Gathering all metadata")
for root, _, files in os.walk(args.root, followlinks=args.follow_links):
workroot = os.path.relpath(root, args.root)
if workroot == ".":
workroot = ""
for f in tqdm.tqdm(files, desc="Gathering metadata", unit="files", dynamic_ncols=True, leave=False):
try:
# fixme global generic filters
if f.endswith(".meta") or f.endswith("~"):
continue
pth = os.path.join(workroot, f)
metadata = meta_tree.get_metadata(pth)
if args.verbose:
logger.debug(f"metadata: {metadata}")
if pth in md:
logger.error("[!] multiple meta? ", pth)
haderrors = True
md[pth] = metadata
except BaseException as inst:
# fixme optionally exit on error?
logger.error(f"[S] Error loading metadata for {pth} Error was: {inst} (skipped)")
# technically metatree has all the md in its cache, but we also have md in a dictionary so who's to say. I guess
# we should make a separate object that lets you query md.
logger.info("Building Webbed Site")
for root, _, files in os.walk(args.root, followlinks=args.follow_links): for root, _, files in os.walk(args.root, followlinks=args.follow_links):
workroot = os.path.relpath(root, args.root) workroot = os.path.relpath(root, args.root)
if workroot == ".": if workroot == ".":
workroot = "" workroot = ""
target_dir = os.path.join(args.output, workroot) target_dir = os.path.join(args.output, workroot)
print("mkdir -> {}".format(target_dir)) logger.info("[D] Make directory -> {}".format(target_dir))
if not args.dry_run: if not args.dry_run:
try: try:
os.mkdir(target_dir) os.mkdir(target_dir)
except FileExistsError: except FileExistsError:
if args.safe: if args.safe:
print("error, target directory exists, aborting") logger.info("[A] Error, target directory exists and we are in safe mode, aborting")
return 1 return 1
for f in files: for f in tqdm.tqdm(files, desc="Building webbed site", unit="files", dynamic_ncols=True, leave=False):
# fixme global generic filters # fixme global generic filters
if f.endswith(".meta") or f.endswith("~"): try:
continue if f.endswith(".meta") or f.endswith("~"):
metadata = meta_tree.get_metadata(os.path.join(workroot, f)) continue
chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata) metadata = md[os.path.join(workroot, f)]
print("process {} -> {} -> {}".format(os.path.join(root, f), repr(chain), os.path.join(target_dir, chain.output_filename))) chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata)
if not args.dry_run: if args.verbose:
try: logger.debug(f"metadata: {metadata}")
# normal output logger.info("[P] Processing {} -> chains: {} -> output: {}".format(os.path.join(root, f), repr(chain), os.path.join(target_dir, chain.output_filename)))
# FIXME support binary streams if not args.dry_run:
collected_output = [line for line in chain.output] try:
with open(os.path.join(target_dir, chain.output_filename), "w") as outfile: # normal output
outfile.writelines(collected_output) # FIXME support binary streams
except PassthroughException: collected_output = [line for line in chain.output]
# write output from input with open(os.path.join(target_dir, chain.output_filename), "w") as outfile:
shutil.copyfile(os.path.join(root, f), os.path.join(target_dir, chain.output_filename)) outfile.writelines(collected_output)
except NoOutputException: except PassthroughException:
print("skip output {} -> {}".format(os.path.join(root, f), os.path.join(target_dir, chain.output_filename))) # write output from input
# don't write anyp output shutil.copyfile(os.path.join(root, f), os.path.join(target_dir, chain.output_filename))
pass except NoOutputException:
logger.warn("[S] No content or output prevented {}".format(os.path.join(root, f), os.path.join(target_dir, chain.output_filename)))
# don't write anyp output
pass
except jinja2.exceptions.TemplateSyntaxError as inst:
logger.error(f"[!][S] Template error processing {f} Error was: {inst.filename}:{inst.lineno} {inst.message} (skipped)")
haderrors = True
except BaseException as inst:
# fixme optionally exit on error?
logger.error(f"[!][S] General error processing {f} Error was: {inst} (skipped)")
haderrors = True
if haderrors:
logger.error("One or more errors in processing.")
return 1
return 0 return 0
def do_main(): def do_main():

View File

@ -108,26 +108,28 @@ class MetaTree:
def _load_metadata(self, cachekey: str) -> Dict: def _load_metadata(self, cachekey: str) -> Dict:
meta = {} meta = {}
with open(cachekey, "r") as inf: try:
if cachekey.endswith(".heck"): with open(cachekey, "r") as inf:
# raise NotImplemented("We don't yet support HECKformat") if cachekey.endswith(".heck"):
with open(cachekey) as cachefile: with open(cachekey) as cachefile:
h = heckformat.parse.load(cachefile) h = heckformat.parse.load(cachefile)
meta = h.flatten_replace() meta = h.flatten_replace()
else: else:
try:
# try json load
meta = jstyleson.load(inf)
except jstyleson.JSONDecodeError as exc:
# try yaml load
try: try:
meta = yaml.load(inf) # try json load
except yaml.parser.ParserError as exc2: meta = jstyleson.load(inf)
# else either the yaml or json has an error except jstyleson.JSONDecodeError as exc:
me = MetaLoadError() # try yaml load
exc2.__context__ = exc try:
raise me from exc2 meta = yaml.load(inf)
return meta except yaml.parser.ParserError as exc2:
# else either the yaml or json has an error
me = MetaLoadError()
exc2.__context__ = exc
except BaseException as inst:
logger.error(f"Can't load any metadata for key {cachekey}: {inst}")
return meta
def get_metadata(self, rel_path: str) -> Dict: def get_metadata(self, rel_path: str) -> Dict:
"""Retrieve the metadata for a given path """Retrieve the metadata for a given path

View File

@ -166,7 +166,10 @@ class ProcessorChains:
ftype = "default" ftype = "default"
if ctx and "type" in ctx: if ctx and "type" in ctx:
ftype = ctx["type"] if isinstance(ctx["type"], str):
ftype = ctx["type"]
else:
ftype = ctx["type"][0]
return self.get_chain_for_file(open(filename, "r"), ftype, filename, ctx) return self.get_chain_for_file(open(filename, "r"), ftype, filename, ctx)
def get_chain_for_file( def get_chain_for_file(

View File

@ -1 +1,12 @@
# processors metadata here # processors metadata here
from . import jinja2_page_embed
from . import jinja2
from . import passthrough
from . import process_heck
from . import process_less
from . import process_md
from . import processors
from . import process_pp
from . import process_sass
from . import process_styl

View File

@ -21,7 +21,8 @@ class Jinja2(PassThrough):
Returns: Returns:
iterable: The post-processed output stream iterable: The post-processed output stream
""" """
ctx = cast(Dict, ctx) if (ctx is None):
ctx = {}
template_env = Environment(loader=FileSystemLoader(ctx["templates"]), extensions=["jinja2.ext.do"]) template_env = Environment(loader=FileSystemLoader(ctx["templates"]), extensions=["jinja2.ext.do"])
template_env.globals.update(ctx["globals"]) template_env.globals.update(ctx["globals"])
template_env.filters.update(ctx["filters"]) template_env.filters.update(ctx["filters"])

View File

@ -53,7 +53,12 @@ class Jinja2PageEmbed(Processor):
template_env = Environment(loader=FileSystemLoader(ctx["templates"]), extensions=["jinja2.ext.do"]) template_env = Environment(loader=FileSystemLoader(ctx["templates"]), extensions=["jinja2.ext.do"])
template_env.globals.update(ctx["globals"]) template_env.globals.update(ctx["globals"])
template_env.filters.update(ctx["filters"]) template_env.filters.update(ctx["filters"])
tmpl = template_env.get_template(ctx["template"]) if isinstance(ctx["template"], str):
tmpl = template_env.get_template(ctx["template"])
else:
# we've got a heck
tmpl = template_env.get_template(ctx["template"][0])
# print(tmpl)
content = "".join([x for x in input_file]) content = "".join([x for x in input_file])
return tmpl.render(content=content, metadata=ctx) return tmpl.render(content=content, metadata=ctx)

View File

@ -1,3 +1,6 @@
"""
Provides various utility functions that are exposed to the templates.
"""
import copy import copy
import datetime import datetime
import glob import glob
@ -15,6 +18,10 @@ from .utils import deep_merge_dicts
def file_list(root: str, listcache: Dict) -> Callable: def file_list(root: str, listcache: Dict) -> Callable:
"""
Return a function (memoized for the cache and root directory) which returns a list of files matching glob and
sorted as required.
"""
def get_file_list( def get_file_list(
path_glob: Union[str, List[str], Tuple[str]], path_glob: Union[str, List[str], Tuple[str]],
*, *,
@ -54,7 +61,7 @@ def file_list(root: str, listcache: Dict) -> Callable:
def file_list_hier(root: str, flist: Callable) -> Callable: def file_list_hier(root: str, flist: Callable) -> Callable:
"""Return a callable which, given a directory, will walk the directory and return the files within """Return a function which, given a directory, will walk the directory and return the files within
it that match the glob passed.""" it that match the glob passed."""
def get_file_list_hier(path: str, glob: str, *, sort_order: str = "ctime", reverse: bool = False) -> Iterable: def get_file_list_hier(path: str, glob: str, *, sort_order: str = "ctime", reverse: bool = False) -> Iterable:
@ -75,6 +82,10 @@ def file_list_hier(root: str, flist: Callable) -> Callable:
def file_name(root: str, metatree: MetaTree, processor_chains: ProcessorChains, namecache: Dict) -> Callable: def file_name(root: str, metatree: MetaTree, processor_chains: ProcessorChains, namecache: Dict) -> Callable:
"""
Return a function (memoized for root directory, metatree and processor chains) which returns the output filename
given an input filename based on metadata and said processing chains.
"""
def get_file_name(file_name: str) -> Dict: def get_file_name(file_name: str) -> Dict:
if file_name in namecache: if file_name in namecache:
return namecache[file_name] return namecache[file_name]
@ -87,6 +98,9 @@ def file_name(root: str, metatree: MetaTree, processor_chains: ProcessorChains,
def file_raw(root: str, contcache: Dict) -> Callable: def file_raw(root: str, contcache: Dict) -> Callable:
"""
Return a function (memoizedfor the root directory) which returns the raw content of a file.
"""
def get_raw(file_name: str) -> str: def get_raw(file_name: str) -> str:
if file_name in contcache: if file_name in contcache:
return contcache[file_name] return contcache[file_name]
@ -97,6 +111,10 @@ def file_raw(root: str, contcache: Dict) -> Callable:
def file_json(root: str) -> Callable: def file_json(root: str) -> Callable:
"""
Return a function (memoized for the root directory) which loads a file as json, merges it with an optional input dictionary
and returns.
"""
def get_json(file_name: str, parent: Dict = None) -> Dict: def get_json(file_name: str, parent: Dict = None) -> Dict:
outd = {} outd = {}
if parent is not None: if parent is not None:
@ -108,7 +126,27 @@ def file_json(root: str) -> Callable:
return get_json return get_json
def file_heck(root: str) -> Callable:
"""
Return a function (memoized for the root directory) which loads a file as HECKFormat, merges it with an optional input
dictionary, and returns.
"""
def get_heck(file_name: str, parent: Dict = None) -> Dict:
outd = {}
if parent is not None:
outd = copy.deepcopy(parent)
with open(os.path.join(root, file_name), "r", encoding="utf-8") as f:
return deep_merge_dicts(outd, heckformat.parse.load(f).flatten_replace())
return get_heck
def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable: def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable:
"""
Return a function (memoized for the root directory, metatree, and processor chains) which returns the post-processed
content of the input file.
"""
def get_file_content(file_name: str) -> Iterable: def get_file_content(file_name: str) -> Iterable:
if file_name in contcache: if file_name in contcache:
return contcache[file_name] return contcache[file_name]
@ -121,13 +159,25 @@ def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChain
def file_metadata(metatree: MetaTree) -> Callable: def file_metadata(metatree: MetaTree) -> Callable:
"""Returns a function (memoized for a metatree) which returns the meta data for a given file."""
def get_file_metadata(file_name: str) -> Dict: def get_file_metadata(file_name: str) -> Dict:
return metatree.get_metadata(file_name) return metatree.get_metadata(file_name)
return get_file_metadata return get_file_metadata
def containsone(needle: Iterable, haystack: Iterable):
"""
Returns true if at least one of the contents of needle is in haystack.
"""
for n in needle:
if n in haystack:
return True
return False
def time_iso8601(timezone: str) -> Callable: def time_iso8601(timezone: str) -> Callable:
"""Returns a function (memoized for a particular timezone) which formats a time as ISO8601 standard. """
tz = pytz.timezone(timezone) tz = pytz.timezone(timezone)
def get_time_iso8601(time_t: Union[int, float]) -> str: def get_time_iso8601(time_t: Union[int, float]) -> str:
@ -137,6 +187,7 @@ def time_iso8601(timezone: str) -> Callable:
def date_iso8601(timezone: str) -> Callable: def date_iso8601(timezone: str) -> Callable:
"""Returns a function (memoized for a particular timezone) which formats a date as ISO8601 standard. """
tz = pytz.timezone(timezone) tz = pytz.timezone(timezone)
def get_date_iso8601(time_t: Union[int, float]) -> str: def get_date_iso8601(time_t: Union[int, float]) -> str: