- Clean up a ton of documentation.
- Make the modules import nicely.
- Add a cool logo to the command line tool
- Make the command-line tool use tqdm
- Make the command line tool load the metadata before processing the
files in a separate loop.
- Fix error handling in the command-line tool processing loops so they
work correctly (and jinja errors are more useful)
- Make command-line tool exit non-zero if there were errors.
- Fix load metadata to handle formats and errors better (and return {}
if it fails)
280 lines
10 KiB
Python
280 lines
10 KiB
Python
"""
|
|
HeckWeasel command line interface.
|
|
|
|
Performs compilation step given an input directory. See --help for more information.
|
|
|
|
"""
|
|
# iterate source tree
|
|
# create directors in target tree
|
|
# for each item:
|
|
# run processor(s) on item, each processor could be in a chain or a branch
|
|
# Processors also provide filename munging
|
|
# output target based on processor output
|
|
|
|
import argparse
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import sys
|
|
import time
|
|
|
|
import jinja2.exceptions
|
|
|
|
from pathlib import Path
|
|
from typing import Dict, List, cast, Union
|
|
|
|
import tqdm
|
|
|
|
from .metadata import MetaTree
|
|
from .processchain import ProcessorChains
|
|
from .processors.processors import PassthroughException, NoOutputException
|
|
from .pygments import pygments_get_css, pygments_markup_contents_html
|
|
from .template_tools import (
|
|
date_iso8601,
|
|
file_content,
|
|
file_list,
|
|
file_list_hier,
|
|
file_json,
|
|
file_heck,
|
|
file_metadata,
|
|
file_name,
|
|
file_raw,
|
|
time_iso8601,
|
|
containsone,
|
|
)
|
|
from .utils import deep_merge_dicts
|
|
from .__init__ import __version__, __copyright__
|
|
|
|
logger = logging.getLogger('heckweasel')
|
|
|
|
logo = f"""
|
|
Aldercone Studio Collective
|
|
_ _ _
|
|
| |_ ___ __| |____ __ _____ __ _ ___ ___| |
|
|
| ' \/ -_) _| / /\ V V / -_) _` (_-</ -_) |
|
|
|_||_\___\__|_\_\ \_/\_/\___\__,_/__/\___|_|
|
|
{__version__}
|
|
"""
|
|
|
|
class TqdmLoggingHandler(logging.Handler):
|
|
"""
|
|
A simple logging wrapper that won't clobber TQDM's progress bar.
|
|
"""
|
|
def __init__(self, level=logging.NOTSET):
|
|
super().__init__(level)
|
|
|
|
def emit(self, record):
|
|
try:
|
|
msg = self.format(record)
|
|
tqdm.tqdm.write(msg)
|
|
self.flush()
|
|
except Exception:
|
|
self.handleError(record)
|
|
|
|
def setup_logging(verbose:bool=False, quiet:bool=False, logfile:Union[Path, str, None]=None) -> None:
|
|
"""
|
|
Configure logging based on some flags.
|
|
"""
|
|
# Setup Tqdm handler
|
|
logger.setLevel(logging.DEBUG)
|
|
h = TqdmLoggingHandler()
|
|
if verbose:
|
|
f = logging.Formatter('%(asctime)s %(module)-12s %(levelname)-8s %(message)s')
|
|
h.setLevel(logging.DEBUG)
|
|
h.setFormatter(f)
|
|
elif quiet:
|
|
f = logging.Formatter('%(levelname)-8s %(message)s')
|
|
h.setLevel(logging.CRITICAL)
|
|
h.setFormatter(f)
|
|
else:
|
|
f = logging.Formatter('%(levelname)-8s %(message)s')
|
|
h.setLevel(logging.INFO)
|
|
h.setFormatter(f)
|
|
logger.addHandler(h)
|
|
|
|
# setup logfile if specified
|
|
if logfile:
|
|
lf = logging.FileHandler(logfile)
|
|
lf.setLevel(logging.DEBUG)
|
|
lf.setFormatter(logging.Formatter('%(asctime)s %(module)-12s %(levelname)-8s %(message)s'))
|
|
logger.addHandler(lf)
|
|
|
|
def parse_var(varspec: str) -> List:
|
|
if (not ('=' in varspec)):
|
|
return [varspec, True]
|
|
return list(varspec.split('=', 2))
|
|
|
|
|
|
def get_args(args: List[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser("Compile a Heckweasel directory into an output directory.")
|
|
|
|
parser.add_argument("root", help="The root of the heckweasel directory to process.")
|
|
parser.add_argument("output", help="The output directory to export post-compiled files to.")
|
|
|
|
parser.add_argument(
|
|
"-c", "--clean", help="Remove the target tree before proceeding (by renaming to .bak).", action="store_true"
|
|
)
|
|
parser.add_argument("-s", "--safe", help="Abort if the target directory already exists.", action="store_true")
|
|
parser.add_argument("-f", "--follow-links", help="Follow symbolic links in the input tree.", action="store_true")
|
|
parser.add_argument("-t", "--template", help="The template directory (default: root/templates)", default=None)
|
|
parser.add_argument("-d", "--dry-run", help="Perform a dry-run.", action="store_true")
|
|
parser.add_argument("-v", "--verbose", help="Output verbosely.", action="store_true")
|
|
parser.add_argument("--processors", help="Specify a path to a processor configuration file.", default=None)
|
|
parser.add_argument(
|
|
"-D", "--define", help="Add a variable to the metadata.", nargs="+", action="extend", type=parse_var)
|
|
result = parser.parse_args(args)
|
|
# validate arguments
|
|
if not os.path.isdir(result.root):
|
|
raise FileNotFoundError("can't find root folder {}".format(result.root))
|
|
|
|
if not result.template:
|
|
result.template = os.path.join(result.root, "templates")
|
|
result.excludes = [result.template]
|
|
|
|
return result
|
|
|
|
|
|
def main() -> int:
|
|
print(logo)
|
|
|
|
try:
|
|
args = get_args(sys.argv[1:])
|
|
except FileNotFoundError as ex:
|
|
logger.info("error finding arguments: {}".format(ex))
|
|
return 1
|
|
setup_logging(args.verbose)
|
|
if os.path.exists(args.output) and args.clean:
|
|
bak = "{}.bak-{}".format(args.output, int(time.time()))
|
|
logger.info("cleaning target {} -> {}".format(args.output, bak))
|
|
os.rename(args.output, bak)
|
|
|
|
process_chains = ProcessorChains(args.processors)
|
|
|
|
default_metadata = {
|
|
"templates": args.template,
|
|
"template": "default.jinja2",
|
|
"dir-template": "default-dir.jinja2",
|
|
"filters": {},
|
|
"build-time": time.time(),
|
|
"uuid-oid-root": "heckweasel",
|
|
"summary": "",
|
|
"description": "",
|
|
"author": "",
|
|
"author_email": "",
|
|
}
|
|
|
|
if args.define:
|
|
for var in args.define:
|
|
default_metadata[var[0]] = var[1]
|
|
|
|
|
|
meta_tree = MetaTree(args.root, default_metadata)
|
|
file_list_cache = cast(Dict, {})
|
|
file_cont_cache = cast(Dict, {})
|
|
file_name_cache = cast(Dict, {})
|
|
file_raw_cache = cast(Dict, {})
|
|
flist = file_list(args.root, file_list_cache)
|
|
|
|
default_metadata["globals"] = {
|
|
"get_file_list": flist,
|
|
"get_hier": file_list_hier(args.root, flist),
|
|
"get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache),
|
|
"get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache),
|
|
"get_json": file_json(args.root),
|
|
"get_heck": file_heck(args.root),
|
|
"get_raw": file_raw(args.root, file_raw_cache),
|
|
"get_file_metadata": file_metadata(meta_tree),
|
|
"get_time_iso8601": time_iso8601("UTC"),
|
|
"get_date_iso8601": date_iso8601("UTC"),
|
|
"pygments_get_css": pygments_get_css,
|
|
"pygments_markup_contents_html": pygments_markup_contents_html,
|
|
"merge_dicts": deep_merge_dicts,
|
|
"containsone": containsone,
|
|
}
|
|
|
|
# fixme add no-progress option for loop just to be the files
|
|
|
|
md = {}
|
|
haderrors = False
|
|
logger.info("Gathering all metadata")
|
|
for root, _, files in os.walk(args.root, followlinks=args.follow_links):
|
|
workroot = os.path.relpath(root, args.root)
|
|
if workroot == ".":
|
|
workroot = ""
|
|
for f in tqdm.tqdm(files, desc="Gathering metadata", unit="files", dynamic_ncols=True, leave=False):
|
|
try:
|
|
# fixme global generic filters
|
|
if f.endswith(".meta") or f.endswith("~"):
|
|
continue
|
|
pth = os.path.join(workroot, f)
|
|
metadata = meta_tree.get_metadata(pth)
|
|
if args.verbose:
|
|
logger.debug(f"metadata: {metadata}")
|
|
if pth in md:
|
|
logger.error("[!] multiple meta? ", pth)
|
|
haderrors = True
|
|
md[pth] = metadata
|
|
except BaseException as inst:
|
|
# fixme optionally exit on error?
|
|
logger.error(f"[S] Error loading metadata for {pth} Error was: {inst} (skipped)")
|
|
|
|
# technically metatree has all the md in its cache, but we also have md in a dictionary so who's to say. I guess
|
|
# we should make a separate object that lets you query md.
|
|
|
|
logger.info("Building Webbed Site")
|
|
for root, _, files in os.walk(args.root, followlinks=args.follow_links):
|
|
workroot = os.path.relpath(root, args.root)
|
|
if workroot == ".":
|
|
workroot = ""
|
|
target_dir = os.path.join(args.output, workroot)
|
|
logger.info("[D] Make directory -> {}".format(target_dir))
|
|
if not args.dry_run:
|
|
try:
|
|
os.mkdir(target_dir)
|
|
except FileExistsError:
|
|
if args.safe:
|
|
logger.info("[A] Error, target directory exists and we are in safe mode, aborting")
|
|
return 1
|
|
for f in tqdm.tqdm(files, desc="Building webbed site", unit="files", dynamic_ncols=True, leave=False):
|
|
# fixme global generic filters
|
|
try:
|
|
if f.endswith(".meta") or f.endswith("~"):
|
|
continue
|
|
metadata = md[os.path.join(workroot, f)]
|
|
chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata)
|
|
if args.verbose:
|
|
logger.debug(f"metadata: {metadata}")
|
|
logger.info("[P] Processing {} -> chains: {} -> output: {}".format(os.path.join(root, f), repr(chain), os.path.join(target_dir, chain.output_filename)))
|
|
if not args.dry_run:
|
|
try:
|
|
# normal output
|
|
# FIXME support binary streams
|
|
collected_output = [line for line in chain.output]
|
|
with open(os.path.join(target_dir, chain.output_filename), "w") as outfile:
|
|
outfile.writelines(collected_output)
|
|
except PassthroughException:
|
|
# write output from input
|
|
shutil.copyfile(os.path.join(root, f), os.path.join(target_dir, chain.output_filename))
|
|
except NoOutputException:
|
|
logger.warn("[S] No content or output prevented {}".format(os.path.join(root, f), os.path.join(target_dir, chain.output_filename)))
|
|
# don't write anyp output
|
|
pass
|
|
except jinja2.exceptions.TemplateSyntaxError as inst:
|
|
logger.error(f"[!][S] Template error processing {f} Error was: {inst.filename}:{inst.lineno} {inst.message} (skipped)")
|
|
haderrors = True
|
|
except BaseException as inst:
|
|
# fixme optionally exit on error?
|
|
logger.error(f"[!][S] General error processing {f} Error was: {inst} (skipped)")
|
|
haderrors = True
|
|
|
|
if haderrors:
|
|
logger.error("One or more errors in processing.")
|
|
return 1
|
|
return 0
|
|
|
|
def do_main():
|
|
sys.exit(main())
|
|
|
|
if __name__ == "__main__":
|
|
do_main()
|