Files
heckweasel/heckweasel/__main__.py
Cassowary 690f110bc5 Major cleanup and fixage for new metadata stuff and heckformat
- Clean up a ton of documentation.
- Make the modules import nicely.
- Add a cool logo to the command line tool
- Make the command-line tool use tqdm
- Make the command line tool load the metadata before processing the
  files in a separate loop.
- Fix error handling in the command-line tool processing loops so they
  work correctly (and jinja errors are more useful)
- Make command-line tool exit non-zero if there were errors.
- Fix load metadata to handle formats and errors better (and return {}
  if it fails)
2024-02-27 21:50:03 -08:00

280 lines
10 KiB
Python

"""
HeckWeasel command line interface.
Performs compilation step given an input directory. See --help for more information.
"""
# iterate source tree
# create directors in target tree
# for each item:
# run processor(s) on item, each processor could be in a chain or a branch
# Processors also provide filename munging
# output target based on processor output
import argparse
import logging
import os
import shutil
import sys
import time
import jinja2.exceptions
from pathlib import Path
from typing import Dict, List, cast, Union
import tqdm
from .metadata import MetaTree
from .processchain import ProcessorChains
from .processors.processors import PassthroughException, NoOutputException
from .pygments import pygments_get_css, pygments_markup_contents_html
from .template_tools import (
date_iso8601,
file_content,
file_list,
file_list_hier,
file_json,
file_heck,
file_metadata,
file_name,
file_raw,
time_iso8601,
containsone,
)
from .utils import deep_merge_dicts
from .__init__ import __version__, __copyright__
logger = logging.getLogger('heckweasel')
logo = f"""
Aldercone Studio Collective
_ _ _
| |_ ___ __| |____ __ _____ __ _ ___ ___| |
| ' \/ -_) _| / /\ V V / -_) _` (_-</ -_) |
|_||_\___\__|_\_\ \_/\_/\___\__,_/__/\___|_|
{__version__}
"""
class TqdmLoggingHandler(logging.Handler):
"""
A simple logging wrapper that won't clobber TQDM's progress bar.
"""
def __init__(self, level=logging.NOTSET):
super().__init__(level)
def emit(self, record):
try:
msg = self.format(record)
tqdm.tqdm.write(msg)
self.flush()
except Exception:
self.handleError(record)
def setup_logging(verbose:bool=False, quiet:bool=False, logfile:Union[Path, str, None]=None) -> None:
"""
Configure logging based on some flags.
"""
# Setup Tqdm handler
logger.setLevel(logging.DEBUG)
h = TqdmLoggingHandler()
if verbose:
f = logging.Formatter('%(asctime)s %(module)-12s %(levelname)-8s %(message)s')
h.setLevel(logging.DEBUG)
h.setFormatter(f)
elif quiet:
f = logging.Formatter('%(levelname)-8s %(message)s')
h.setLevel(logging.CRITICAL)
h.setFormatter(f)
else:
f = logging.Formatter('%(levelname)-8s %(message)s')
h.setLevel(logging.INFO)
h.setFormatter(f)
logger.addHandler(h)
# setup logfile if specified
if logfile:
lf = logging.FileHandler(logfile)
lf.setLevel(logging.DEBUG)
lf.setFormatter(logging.Formatter('%(asctime)s %(module)-12s %(levelname)-8s %(message)s'))
logger.addHandler(lf)
def parse_var(varspec: str) -> List:
if (not ('=' in varspec)):
return [varspec, True]
return list(varspec.split('=', 2))
def get_args(args: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser("Compile a Heckweasel directory into an output directory.")
parser.add_argument("root", help="The root of the heckweasel directory to process.")
parser.add_argument("output", help="The output directory to export post-compiled files to.")
parser.add_argument(
"-c", "--clean", help="Remove the target tree before proceeding (by renaming to .bak).", action="store_true"
)
parser.add_argument("-s", "--safe", help="Abort if the target directory already exists.", action="store_true")
parser.add_argument("-f", "--follow-links", help="Follow symbolic links in the input tree.", action="store_true")
parser.add_argument("-t", "--template", help="The template directory (default: root/templates)", default=None)
parser.add_argument("-d", "--dry-run", help="Perform a dry-run.", action="store_true")
parser.add_argument("-v", "--verbose", help="Output verbosely.", action="store_true")
parser.add_argument("--processors", help="Specify a path to a processor configuration file.", default=None)
parser.add_argument(
"-D", "--define", help="Add a variable to the metadata.", nargs="+", action="extend", type=parse_var)
result = parser.parse_args(args)
# validate arguments
if not os.path.isdir(result.root):
raise FileNotFoundError("can't find root folder {}".format(result.root))
if not result.template:
result.template = os.path.join(result.root, "templates")
result.excludes = [result.template]
return result
def main() -> int:
print(logo)
try:
args = get_args(sys.argv[1:])
except FileNotFoundError as ex:
logger.info("error finding arguments: {}".format(ex))
return 1
setup_logging(args.verbose)
if os.path.exists(args.output) and args.clean:
bak = "{}.bak-{}".format(args.output, int(time.time()))
logger.info("cleaning target {} -> {}".format(args.output, bak))
os.rename(args.output, bak)
process_chains = ProcessorChains(args.processors)
default_metadata = {
"templates": args.template,
"template": "default.jinja2",
"dir-template": "default-dir.jinja2",
"filters": {},
"build-time": time.time(),
"uuid-oid-root": "heckweasel",
"summary": "",
"description": "",
"author": "",
"author_email": "",
}
if args.define:
for var in args.define:
default_metadata[var[0]] = var[1]
meta_tree = MetaTree(args.root, default_metadata)
file_list_cache = cast(Dict, {})
file_cont_cache = cast(Dict, {})
file_name_cache = cast(Dict, {})
file_raw_cache = cast(Dict, {})
flist = file_list(args.root, file_list_cache)
default_metadata["globals"] = {
"get_file_list": flist,
"get_hier": file_list_hier(args.root, flist),
"get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache),
"get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache),
"get_json": file_json(args.root),
"get_heck": file_heck(args.root),
"get_raw": file_raw(args.root, file_raw_cache),
"get_file_metadata": file_metadata(meta_tree),
"get_time_iso8601": time_iso8601("UTC"),
"get_date_iso8601": date_iso8601("UTC"),
"pygments_get_css": pygments_get_css,
"pygments_markup_contents_html": pygments_markup_contents_html,
"merge_dicts": deep_merge_dicts,
"containsone": containsone,
}
# fixme add no-progress option for loop just to be the files
md = {}
haderrors = False
logger.info("Gathering all metadata")
for root, _, files in os.walk(args.root, followlinks=args.follow_links):
workroot = os.path.relpath(root, args.root)
if workroot == ".":
workroot = ""
for f in tqdm.tqdm(files, desc="Gathering metadata", unit="files", dynamic_ncols=True, leave=False):
try:
# fixme global generic filters
if f.endswith(".meta") or f.endswith("~"):
continue
pth = os.path.join(workroot, f)
metadata = meta_tree.get_metadata(pth)
if args.verbose:
logger.debug(f"metadata: {metadata}")
if pth in md:
logger.error("[!] multiple meta? ", pth)
haderrors = True
md[pth] = metadata
except BaseException as inst:
# fixme optionally exit on error?
logger.error(f"[S] Error loading metadata for {pth} Error was: {inst} (skipped)")
# technically metatree has all the md in its cache, but we also have md in a dictionary so who's to say. I guess
# we should make a separate object that lets you query md.
logger.info("Building Webbed Site")
for root, _, files in os.walk(args.root, followlinks=args.follow_links):
workroot = os.path.relpath(root, args.root)
if workroot == ".":
workroot = ""
target_dir = os.path.join(args.output, workroot)
logger.info("[D] Make directory -> {}".format(target_dir))
if not args.dry_run:
try:
os.mkdir(target_dir)
except FileExistsError:
if args.safe:
logger.info("[A] Error, target directory exists and we are in safe mode, aborting")
return 1
for f in tqdm.tqdm(files, desc="Building webbed site", unit="files", dynamic_ncols=True, leave=False):
# fixme global generic filters
try:
if f.endswith(".meta") or f.endswith("~"):
continue
metadata = md[os.path.join(workroot, f)]
chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata)
if args.verbose:
logger.debug(f"metadata: {metadata}")
logger.info("[P] Processing {} -> chains: {} -> output: {}".format(os.path.join(root, f), repr(chain), os.path.join(target_dir, chain.output_filename)))
if not args.dry_run:
try:
# normal output
# FIXME support binary streams
collected_output = [line for line in chain.output]
with open(os.path.join(target_dir, chain.output_filename), "w") as outfile:
outfile.writelines(collected_output)
except PassthroughException:
# write output from input
shutil.copyfile(os.path.join(root, f), os.path.join(target_dir, chain.output_filename))
except NoOutputException:
logger.warn("[S] No content or output prevented {}".format(os.path.join(root, f), os.path.join(target_dir, chain.output_filename)))
# don't write anyp output
pass
except jinja2.exceptions.TemplateSyntaxError as inst:
logger.error(f"[!][S] Template error processing {f} Error was: {inst.filename}:{inst.lineno} {inst.message} (skipped)")
haderrors = True
except BaseException as inst:
# fixme optionally exit on error?
logger.error(f"[!][S] General error processing {f} Error was: {inst} (skipped)")
haderrors = True
if haderrors:
logger.error("One or more errors in processing.")
return 1
return 0
def do_main():
sys.exit(main())
if __name__ == "__main__":
do_main()