Initial chekin post-discontinuity.
This commit is contained in:
1
heckweasel/__init__.py
Normal file
1
heckweasel/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
__version__ = '0.7.0'
|
||||
157
heckweasel/__main__.py
Normal file
157
heckweasel/__main__.py
Normal file
@ -0,0 +1,157 @@
|
||||
# iterate source tree
|
||||
# create directors in target tree
|
||||
# for each item:
|
||||
# run processor(s) on item, each processor could be in a chain or a branch
|
||||
# Processors also provide filename munging
|
||||
# output target based on processor output
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import time
|
||||
from typing import Dict, List, cast
|
||||
|
||||
from .metadata import MetaTree
|
||||
from .processchain import ProcessorChains
|
||||
from .processors.processors import PassthroughException
|
||||
from .pygments import pygments_get_css, pygments_markup_contents_html
|
||||
from .template_tools import (
|
||||
date_iso8601,
|
||||
file_content,
|
||||
file_list,
|
||||
file_list_hier,
|
||||
file_json,
|
||||
file_metadata,
|
||||
file_name,
|
||||
file_raw,
|
||||
time_iso8601,
|
||||
)
|
||||
from .utils import deep_merge_dicts
|
||||
|
||||
logger = logging.getLogger()
|
||||
|
||||
|
||||
def setup_logging(verbose: bool = False) -> None:
|
||||
pass
|
||||
|
||||
|
||||
def parse_var(varspec: str) -> List:
|
||||
if (not ('=' in varspec)):
|
||||
return [varspec, True]
|
||||
return list(varspec.split('=', 2))
|
||||
|
||||
|
||||
def get_args(args: List[str]) -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser("Compile a Heckweasel directory into an output directory.")
|
||||
|
||||
parser.add_argument("root", help="The root of the heckweasel directory to process.")
|
||||
parser.add_argument("output", help="The output directory to export post-compiled files to.")
|
||||
|
||||
parser.add_argument(
|
||||
"-c", "--clean", help="Remove the target tree before proceeding (by renaming to .bak).", action="store_true"
|
||||
)
|
||||
parser.add_argument("-s", "--safe", help="Abort if the target directory already exists.", action="store_true")
|
||||
parser.add_argument("-f", "--follow-links", help="Follow symbolic links in the input tree.", action="store_true")
|
||||
parser.add_argument("-t", "--template", help="The template directory (default: root/templates)", default=None)
|
||||
parser.add_argument("-d", "--dry-run", help="Perform a dry-run.", action="store_true")
|
||||
parser.add_argument("-v", "--verbose", help="Output verbosely.", action="store_true")
|
||||
parser.add_argument("--processors", help="Specify a path to a processor configuration file.", default=None)
|
||||
parser.add_argument(
|
||||
"-D", "--define", help="Add a variable to the metadata.", nargs="+", action="extend", type=parse_var)
|
||||
result = parser.parse_args(args)
|
||||
# validate arguments
|
||||
if not os.path.isdir(result.root):
|
||||
raise FileNotFoundError("can't find root folder {}".format(result.root))
|
||||
|
||||
if not result.template:
|
||||
result.template = os.path.join(result.root, "templates")
|
||||
result.excludes = [result.template]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main() -> int:
|
||||
try:
|
||||
args = get_args(sys.argv[1:])
|
||||
except FileNotFoundError as ex:
|
||||
print("error finding arguments: {}".format(ex))
|
||||
return 1
|
||||
setup_logging(args.verbose)
|
||||
if os.path.exists(args.output) and args.clean:
|
||||
bak = "{}.bak-{}".format(args.output, int(time.time()))
|
||||
print("cleaning target {} -> {}".format(args.output, bak))
|
||||
os.rename(args.output, bak)
|
||||
|
||||
process_chains = ProcessorChains(args.processors)
|
||||
|
||||
default_metadata = {
|
||||
"templates": args.template,
|
||||
"template": "default.jinja2",
|
||||
"dir-template": "default-dir.jinja2",
|
||||
"filters": {},
|
||||
"build-time": time.time(),
|
||||
"uuid-oid-root": "heckweasel",
|
||||
"summary": "",
|
||||
"description": "",
|
||||
"author": "",
|
||||
"author_email": "",
|
||||
}
|
||||
if args.define:
|
||||
for var in args.define:
|
||||
default_metadata[var[0]] = var[1]
|
||||
meta_tree = MetaTree(args.root, default_metadata)
|
||||
file_list_cache = cast(Dict, {})
|
||||
file_cont_cache = cast(Dict, {})
|
||||
file_name_cache = cast(Dict, {})
|
||||
file_raw_cache = cast(Dict, {})
|
||||
flist = file_list(args.root, file_list_cache)
|
||||
default_metadata["globals"] = {
|
||||
"get_file_list": flist,
|
||||
"get_hier": file_list_hier(args.root, flist),
|
||||
"get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache),
|
||||
"get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache),
|
||||
"get_json": file_json(args.root),
|
||||
"get_raw": file_raw(args.root, file_raw_cache),
|
||||
"get_file_metadata": file_metadata(meta_tree),
|
||||
"get_time_iso8601": time_iso8601("UTC"),
|
||||
"get_date_iso8601": date_iso8601("UTC"),
|
||||
"pygments_get_css": pygments_get_css,
|
||||
"pygments_markup_contents_html": pygments_markup_contents_html,
|
||||
"merge_dicts": deep_merge_dicts,
|
||||
}
|
||||
|
||||
for root, _, files in os.walk(args.root, followlinks=args.follow_links):
|
||||
workroot = os.path.relpath(root, args.root)
|
||||
if workroot == ".":
|
||||
workroot = ""
|
||||
target_dir = os.path.join(args.output, workroot)
|
||||
print("mkdir -> {}".format(target_dir))
|
||||
if not args.dry_run:
|
||||
try:
|
||||
os.mkdir(target_dir)
|
||||
except FileExistsError:
|
||||
if args.safe:
|
||||
print("error, target directory exists, aborting")
|
||||
return 1
|
||||
for f in files:
|
||||
# fixme global generic filters
|
||||
if f.endswith(".meta") or f.endswith("~"):
|
||||
continue
|
||||
metadata = meta_tree.get_metadata(os.path.join(workroot, f))
|
||||
chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata)
|
||||
print("process {} -> {} -> {}".format(os.path.join(root, f), repr(chain), os.path.join(target_dir, chain.output_filename)))
|
||||
if not args.dry_run:
|
||||
try:
|
||||
with open(os.path.join(target_dir, chain.output_filename), "w") as outfile:
|
||||
for line in chain.output:
|
||||
outfile.write(line)
|
||||
except PassthroughException:
|
||||
shutil.copyfile(os.path.join(root, f), os.path.join(target_dir, chain.output_filename))
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
111
heckweasel/defaults/chains.yaml
Normal file
111
heckweasel/defaults/chains.yaml
Normal file
@ -0,0 +1,111 @@
|
||||
# Default: output == input
|
||||
default:
|
||||
extension: default
|
||||
chain:
|
||||
- passthrough
|
||||
|
||||
# Any object that needs jinja scripts but no other explicit processing
|
||||
templatable:
|
||||
extension: null
|
||||
chain:
|
||||
- jinja2
|
||||
|
||||
# Any object that needs jinja and to be embedded in a parent template
|
||||
tembed:
|
||||
extension: null
|
||||
chain:
|
||||
- jinja2
|
||||
- jinja2_page_embed
|
||||
|
||||
# Markdown, BBCode and RST are first run through the templater, and then
|
||||
# they are processed into HTML, and finally embedded in a page template.
|
||||
markdown:
|
||||
extension:
|
||||
- md
|
||||
chain:
|
||||
- jinja2
|
||||
- process_md
|
||||
- jinja2_page_embed
|
||||
bbcode:
|
||||
extension:
|
||||
- bb
|
||||
- pp
|
||||
chain:
|
||||
- jinja2
|
||||
- process_pp
|
||||
- jinja2_page_embed
|
||||
# FIXME implement RST processor
|
||||
# restructured:
|
||||
# extension:
|
||||
# - rst
|
||||
# chain:
|
||||
# - jinja2
|
||||
# - process_rst
|
||||
# - jinja2_page_embed
|
||||
|
||||
# # JSON and YAML are split, passed through a pretty printer, and then output
|
||||
# FIXME implement split chain processor, implement processor arguments
|
||||
# json:
|
||||
# extension:
|
||||
# - json
|
||||
# chain:
|
||||
# - split (passthrough)
|
||||
# - pp_json
|
||||
# yaml:
|
||||
# extension:
|
||||
# - yml
|
||||
# - yaml
|
||||
# chain:
|
||||
# - split (passthrough)
|
||||
# - pp_yaml
|
||||
|
||||
# Template-html is first passed through the templater, and then embedded
|
||||
# in a page template
|
||||
template-html:
|
||||
extension:
|
||||
- thtml
|
||||
- cont
|
||||
chain:
|
||||
- jinja2
|
||||
- jinja2_page_embed
|
||||
|
||||
# # Smart CSS are simply converted to CSS.
|
||||
# sass:
|
||||
# extension:
|
||||
# - sass
|
||||
# - scss
|
||||
# chain:
|
||||
# - process_sass
|
||||
# less:
|
||||
# extension:
|
||||
# - less
|
||||
# chain:
|
||||
# - process_less
|
||||
|
||||
# stylus:
|
||||
# extension:
|
||||
# - styl
|
||||
# chain:
|
||||
# - process_styl
|
||||
|
||||
# # Images are processed into thumbnails and sized in addition to being retained as their original
|
||||
# FIXME implement split chain processor, implement processor arguments,
|
||||
# image:
|
||||
# extension:
|
||||
# - jpg
|
||||
# - jpeg
|
||||
# - png
|
||||
# chain:
|
||||
# - split (image_bigthumb)
|
||||
# - split (image_smallthumb)
|
||||
# - passthrough
|
||||
|
||||
# image_bigthumb:
|
||||
# extension:
|
||||
# chain:
|
||||
# - smart_resize (big)
|
||||
|
||||
# image_smallthumb:
|
||||
# extension:
|
||||
# chain:
|
||||
# - smart_resize (small)
|
||||
153
heckweasel/metadata.py
Normal file
153
heckweasel/metadata.py
Normal file
@ -0,0 +1,153 @@
|
||||
"""Constructs a tree-like object containing the metadata for a given path, and caches said metadata."""
|
||||
|
||||
import fnmatch
|
||||
import logging
|
||||
import mimetypes
|
||||
import os
|
||||
import uuid
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union, cast
|
||||
|
||||
import jstyleson
|
||||
|
||||
from .utils import guess_mime
|
||||
|
||||
# setup mimetypes with some extra ones
|
||||
mimetypes.init()
|
||||
mimetypes.add_type("text/html", "thtml")
|
||||
mimetypes.add_type("text/html", "cont")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class MetaCacheMiss(Exception):
|
||||
"""Raised on cache miss."""
|
||||
|
||||
|
||||
class MetaCache:
|
||||
"""This class provides an in-memory cache for metadata tree."""
|
||||
|
||||
def __init__(self, max_age: float = 200.0):
|
||||
"""Initialize the cache.
|
||||
|
||||
Arguments:
|
||||
max_age (int): the number of seconds to age-out cache items
|
||||
|
||||
"""
|
||||
self._max_age = max_age
|
||||
self._cache: Dict[str, Tuple[float, Any]] = {}
|
||||
|
||||
def get(self, key: str, new_time_stamp: float) -> Any:
|
||||
"""Get an item from the cache.
|
||||
|
||||
Arguments:
|
||||
key (str): the cache key to retieve
|
||||
new_time_stamp (int): The time to use to compare the stored time with
|
||||
|
||||
Returns:
|
||||
:obj:misc: The previously stored value.
|
||||
|
||||
Raises:
|
||||
MetaCacheMiss: on missing key, or on aged out
|
||||
|
||||
"""
|
||||
if key not in self._cache:
|
||||
raise MetaCacheMiss("no item for key {}".format(key))
|
||||
|
||||
if self._cache[key][0] + self._max_age <= new_time_stamp:
|
||||
return self._cache[key][1]
|
||||
|
||||
raise MetaCacheMiss("cache expired for key {}".format(key))
|
||||
|
||||
def put(self, key: str, value: Union[Dict, List, int, str, object], time_stamp: float) -> None:
|
||||
"""Put an item into the cache.
|
||||
|
||||
Arguments:
|
||||
key (str): the key to store the cache item under
|
||||
value (:obj:misc): the value to store
|
||||
time_stamp (float): the time stamp to store the item under
|
||||
|
||||
"""
|
||||
self._cache[key] = (time_stamp, value)
|
||||
|
||||
|
||||
class MetaTree:
|
||||
"""This provides an interface to loading and caching tree metadata for a given directory tree."""
|
||||
|
||||
def __init__(self, root: str, default_metadata: Optional[Dict] = None):
|
||||
"""Initialize the metadata tree object.
|
||||
|
||||
Arguments:
|
||||
root (str): The path to the root of the file tree to operate on.
|
||||
default_metadata (dict, optional): The default metadata to apply to the tree
|
||||
|
||||
"""
|
||||
self._cache = MetaCache()
|
||||
if default_metadata is None:
|
||||
default_metadata = {}
|
||||
self._default_metadata = default_metadata
|
||||
if root[-1] != "/":
|
||||
root += "/"
|
||||
self._root = root
|
||||
|
||||
def get_metadata(self, rel_path: str) -> Dict:
|
||||
"""Retrieve the metadata for a given path
|
||||
|
||||
The general procedure is to iterate the tree, at each level
|
||||
load .meta (JSON formatted dictionary) for that level, and
|
||||
then finally load the path.meta, and merge these dictionaries
|
||||
in descendant order.
|
||||
|
||||
Arguments:
|
||||
rel_path (str): The path to retrieve the metadata for (relative to root)
|
||||
|
||||
Returns:
|
||||
dict: A dictionary of metadata for that path tree.
|
||||
|
||||
"""
|
||||
metablob = dict(self._default_metadata)
|
||||
# iterate path components from root to target path
|
||||
comps = [self._root] + rel_path.split("/")
|
||||
fullpath = ""
|
||||
ospath = os.path.join(self._root, rel_path)
|
||||
for pth in comps:
|
||||
fullpath = os.path.join(fullpath, pth)
|
||||
st = os.stat(fullpath)
|
||||
|
||||
if os.path.isdir(fullpath):
|
||||
cachekey = os.path.join(fullpath, ".meta")
|
||||
else:
|
||||
cachekey = fullpath + ".meta"
|
||||
meta = cast(Dict, {})
|
||||
try:
|
||||
st_meta = os.stat(cachekey)
|
||||
meta = self._cache.get(cachekey, st_meta.st_mtime)
|
||||
except FileNotFoundError:
|
||||
st_meta = None # type: ignore
|
||||
except MetaCacheMiss:
|
||||
meta = {}
|
||||
|
||||
if not meta and st_meta:
|
||||
meta = jstyleson.load(open(cachekey, "r"))
|
||||
self._cache.put(cachekey, meta, st_meta.st_mtime)
|
||||
|
||||
if fullpath == ospath and "wildcard_metadata" in metablob:
|
||||
for wild in metablob["wildcard_metadata"]:
|
||||
if fnmatch.fnmatch(pth, wild[0]):
|
||||
metablob.update(wild[1])
|
||||
|
||||
metablob.update(meta)
|
||||
|
||||
# return final dict
|
||||
metablob["dir"], metablob["file_name"] = os.path.split(rel_path)
|
||||
metablob["file_path"] = rel_path
|
||||
metablob["relpath"] = os.path.relpath("/", "/" + metablob["dir"])
|
||||
metablob["uuid"] = uuid.uuid3(uuid.NAMESPACE_OID, metablob["uuid-oid-root"] + ospath)
|
||||
metablob["os-path"], _ = os.path.split(fullpath)
|
||||
metablob["guessed-type"] = guess_mime(ospath)
|
||||
if "mime-type" not in metablob:
|
||||
metablob["mime-type"] = metablob["guessed-type"]
|
||||
metablob["stat"] = {}
|
||||
for stk in ("st_mtime", "st_ctime", "st_atime", "st_mode", "st_size", "st_ino"):
|
||||
metablob["stat"][stk.replace("st_", "")] = getattr(st, stk)
|
||||
|
||||
return metablob
|
||||
182
heckweasel/processchain.py
Normal file
182
heckweasel/processchain.py
Normal file
@ -0,0 +1,182 @@
|
||||
"""Interface for chains of processors"""
|
||||
|
||||
import os
|
||||
import os.path
|
||||
import random
|
||||
from typing import Any, Dict, Iterable, List, Optional, Type, cast
|
||||
|
||||
import yaml
|
||||
|
||||
from .processors.processors import Processor
|
||||
|
||||
|
||||
class ProcessorChain:
|
||||
"""This implements a wrapper for an arbitrary set of processors and an associated file stream."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
processors: List[Processor],
|
||||
file_name: str,
|
||||
file_data: Iterable[str],
|
||||
file_type: str,
|
||||
ctx: Optional[Dict] = None,
|
||||
):
|
||||
"""Initialize the processing stream.
|
||||
|
||||
Arguments:
|
||||
processors (list): A list of processor objects.
|
||||
file_data (Iterable): An iterable from which to retrieve the input
|
||||
file_type (str): the specified file type for consumer information.
|
||||
|
||||
"""
|
||||
self._processors = processors
|
||||
self._file_data = file_data
|
||||
self._file_type = file_type
|
||||
self._file_name = file_name
|
||||
self._ctx: Dict = {}
|
||||
if ctx is not None:
|
||||
self._ctx = cast(Dict, ctx)
|
||||
|
||||
@property
|
||||
def output(self) -> Iterable:
|
||||
"""Return an iterable for the output of the process chain
|
||||
|
||||
Returns:
|
||||
:obj:'iterable': the iterable
|
||||
|
||||
"""
|
||||
prev = self._file_data
|
||||
for processor in self._processors:
|
||||
if processor:
|
||||
prev = processor.process(prev, self._ctx)
|
||||
|
||||
return prev
|
||||
|
||||
@property
|
||||
def output_mime(self) -> str:
|
||||
"""Return the post-processed MIME value from the processing chain
|
||||
|
||||
Returns:
|
||||
str: the mime type
|
||||
|
||||
"""
|
||||
fname = self._file_name
|
||||
for processor in self._processors:
|
||||
fname = processor.mime_type(fname, self._ctx)
|
||||
return fname
|
||||
|
||||
@property
|
||||
def output_ext(self) -> str:
|
||||
"""Return the post-processed extension from the processing chain
|
||||
|
||||
Returns:
|
||||
str: the extension
|
||||
"""
|
||||
fname = self._file_name
|
||||
for processor in self._processors:
|
||||
fname = processor.extension(fname, self._ctx)
|
||||
return fname
|
||||
|
||||
@property
|
||||
def output_filename(self) -> str:
|
||||
"""Return the post-processed filename from the processing chain
|
||||
|
||||
Returns:
|
||||
str: the new filename
|
||||
|
||||
"""
|
||||
fname = os.path.basename(self._file_name)
|
||||
for processor in self._processors:
|
||||
fname = processor.filename(fname, self._ctx)
|
||||
return fname
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return "[" + ",".join([x.__class__.__name__ for x in self._processors]) + "]"
|
||||
|
||||
|
||||
class ProcessorChains:
|
||||
"""Load a configuration for processor chains, and provide ability to process the chains given a particular input
|
||||
file.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Optional[str] = None):
|
||||
"""Initialize, with a specified configuration file
|
||||
|
||||
Arguments:
|
||||
config (str, optional): The path to a yaml formatted configuration file.
|
||||
|
||||
"""
|
||||
if config is None: # pragma: no coverage
|
||||
config = os.path.join(os.path.dirname(__file__), "defaults", "chains.yaml")
|
||||
|
||||
self.chainconfig = yaml.load(open(config, "r"))
|
||||
self.extensionmap: Dict[str, Any] = {}
|
||||
self.processors: Dict[str, Type[Processor]] = {}
|
||||
for ch, conf in self.chainconfig.items():
|
||||
if conf["extension"] == "default":
|
||||
self.default = ch
|
||||
else:
|
||||
if conf["extension"]:
|
||||
for ex in conf["extension"]:
|
||||
if ex in self.extensionmap or ex is None:
|
||||
# log an error or except or something we'll just override for now.
|
||||
pass
|
||||
self.extensionmap[ex] = ch
|
||||
for pr in conf["chain"]:
|
||||
if pr in self.processors:
|
||||
continue
|
||||
processor_module = __import__("processors", globals(), locals(), [pr], 1)
|
||||
self.processors[pr] = processor_module.__dict__[pr].processor
|
||||
|
||||
def get_chain_for_filename(self, filename: str, ctx: Optional[Dict] = None) -> ProcessorChain:
|
||||
"""Get the ProcessorChain, as configured for a given file by extension.
|
||||
|
||||
Arguments:
|
||||
filename (str): The name of the file to get a chain for.
|
||||
|
||||
Returns:
|
||||
ProcessorChain: the constructed processor chain.
|
||||
"""
|
||||
r = filename.rsplit(".", 1)
|
||||
ftype = "default"
|
||||
if r:
|
||||
ftype = r[-1]
|
||||
if ctx and "pragma" in ctx:
|
||||
if "no-proc" in ctx["pragma"]:
|
||||
ftype = "default"
|
||||
|
||||
if ctx and "type" in ctx:
|
||||
ftype = ctx["type"]
|
||||
return self.get_chain_for_file(open(filename, "r"), ftype, filename, ctx)
|
||||
|
||||
def get_chain_for_file(
|
||||
self, file_obj: Iterable, file_ext: str, file_name: Optional[str] = None, ctx: Optional[Dict] = None
|
||||
) -> ProcessorChain:
|
||||
"""Get the ProcessorChain for a given iterable object based on the specified file type
|
||||
|
||||
Arguments:
|
||||
file_obj (:obj:`iterable`): The input file stream
|
||||
file_ext (str): The type (extension) of the input stream
|
||||
|
||||
Returns:
|
||||
ProcessorChain: the constructed processor chain.
|
||||
|
||||
"""
|
||||
if file_ext not in self.extensionmap or not self.extensionmap[file_ext]:
|
||||
if file_ext in self.chainconfig:
|
||||
file_type = file_ext
|
||||
else:
|
||||
file_type = "default"
|
||||
else:
|
||||
file_type = self.extensionmap[file_ext]
|
||||
|
||||
if not (bool(file_name)):
|
||||
file_name = hex(random.randint(0, 65536))
|
||||
|
||||
return ProcessorChain(
|
||||
[self.processors[x]() for x in self.chainconfig[file_type]["chain"]],
|
||||
cast(str, file_name),
|
||||
file_obj,
|
||||
file_type,
|
||||
ctx,
|
||||
)
|
||||
1
heckweasel/processors/__init__.py
Normal file
1
heckweasel/processors/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
# processors metadata here
|
||||
31
heckweasel/processors/jinja2.py
Normal file
31
heckweasel/processors/jinja2.py
Normal file
@ -0,0 +1,31 @@
|
||||
"""Define a Jinja2 Processor which applies programmable templating to the input stream."""
|
||||
|
||||
from typing import Dict, Iterable, Optional, cast
|
||||
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
from .passthrough import PassThrough
|
||||
|
||||
|
||||
class Jinja2(PassThrough):
|
||||
"""Pass the input stream through Jinja2 for scritable templating."""
|
||||
|
||||
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
|
||||
"""Return an iterable object of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
input_file (iterable): An input stream
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
|
||||
Returns:
|
||||
iterable: The post-processed output stream
|
||||
"""
|
||||
ctx = cast(Dict, ctx)
|
||||
template_env = Environment(loader=FileSystemLoader(ctx["templates"]), extensions=["jinja2.ext.do"])
|
||||
template_env.globals.update(ctx["globals"])
|
||||
template_env.filters.update(ctx["filters"])
|
||||
tmpl = template_env.from_string("".join([x for x in input_file]))
|
||||
return tmpl.render(metadata=ctx)
|
||||
|
||||
processor = Jinja2
|
||||
74
heckweasel/processors/jinja2_page_embed.py
Normal file
74
heckweasel/processors/jinja2_page_embed.py
Normal file
@ -0,0 +1,74 @@
|
||||
"""Define a Jinja2 processor which embeds the (presumably HTML) input stream into a Page Template
|
||||
as defined in the ctx metadata (the ``content`` variable is assigned to the input stream and
|
||||
the target template is rendered)."""
|
||||
|
||||
import os
|
||||
from typing import Dict, Iterable, Optional, cast
|
||||
|
||||
from jinja2 import Environment, FileSystemLoader
|
||||
|
||||
from .processors import Processor
|
||||
|
||||
|
||||
class Jinja2PageEmbed(Processor):
|
||||
"""Embed input stream as ``content`` variable in page template defined in context key ``template``."""
|
||||
|
||||
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the filename of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the previous name for the file.
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new name for the file
|
||||
|
||||
"""
|
||||
return os.path.splitext(oldname)[0] + "." + self.extension(oldname, ctx)
|
||||
|
||||
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new mimetype of the file after processing
|
||||
|
||||
"""
|
||||
return ctx.get("mime", "text/html")
|
||||
|
||||
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
|
||||
"""Return an iterable object of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
input_file (iterable): An input stream
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
iterable: The post-processed output stream
|
||||
"""
|
||||
ctx = cast(Dict, ctx)
|
||||
template_env = Environment(loader=FileSystemLoader(ctx["templates"]), extensions=["jinja2.ext.do"])
|
||||
template_env.globals.update(ctx["globals"])
|
||||
template_env.filters.update(ctx["filters"])
|
||||
tmpl = template_env.get_template(ctx["template"])
|
||||
content = "".join([x for x in input_file])
|
||||
return tmpl.render(content=content, metadata=ctx)
|
||||
|
||||
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new extension of the file after processing
|
||||
|
||||
"""
|
||||
return ctx.get("extension", "html")
|
||||
|
||||
|
||||
processor = Jinja2PageEmbed
|
||||
68
heckweasel/processors/passthrough.py
Normal file
68
heckweasel/processors/passthrough.py
Normal file
@ -0,0 +1,68 @@
|
||||
"""Passthrough progcessor which takes input and returns it."""
|
||||
|
||||
import os
|
||||
from typing import Dict, Iterable, Optional, cast
|
||||
|
||||
from ..utils import guess_mime
|
||||
from .processors import PassthroughException, Processor
|
||||
|
||||
|
||||
class PassThrough(Processor):
|
||||
"""A simple passthrough processor that takes input and sends it to output."""
|
||||
|
||||
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the filename of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the previous name for the file.
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new name for the file
|
||||
|
||||
"""
|
||||
return oldname
|
||||
|
||||
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new mimetype of the file after processing
|
||||
|
||||
"""
|
||||
result = cast(str, guess_mime(oldname))
|
||||
if result == "directory":
|
||||
result = "DIR"
|
||||
return result
|
||||
|
||||
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
|
||||
"""Return an iterable object of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
input_file (iterable): An input stream
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
iterable: The post-processed output stream
|
||||
"""
|
||||
raise PassthroughException("passthrough")
|
||||
|
||||
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new extension of the file after processing
|
||||
|
||||
"""
|
||||
return os.path.splitext(oldname)[-1]
|
||||
|
||||
|
||||
processor = PassThrough
|
||||
1
heckweasel/processors/process_less.py
Normal file
1
heckweasel/processors/process_less.py
Normal file
@ -0,0 +1 @@
|
||||
processor = None
|
||||
68
heckweasel/processors/process_md.py
Normal file
68
heckweasel/processors/process_md.py
Normal file
@ -0,0 +1,68 @@
|
||||
"""Convert an MD stream into an HTML stream"""
|
||||
|
||||
import io
|
||||
import os
|
||||
from typing import Dict, Iterable, Optional
|
||||
|
||||
import markdown
|
||||
|
||||
from .processors import Processor
|
||||
|
||||
|
||||
class MarkdownProcessor(Processor):
|
||||
"""Convert an MD stream into an HTML stream"""
|
||||
|
||||
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the filename of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the previous name for the file.
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new name for the file
|
||||
|
||||
"""
|
||||
return os.path.splitext(oldname)[0] + ".html"
|
||||
|
||||
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new mimetype of the file after processing
|
||||
|
||||
"""
|
||||
return "text/html"
|
||||
|
||||
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new extension of the file after processing
|
||||
|
||||
"""
|
||||
return "html"
|
||||
|
||||
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
|
||||
"""Return an iterable object of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
input_file (iterable): An input stream
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
iterable: The post-processed output stream
|
||||
"""
|
||||
md = u"".join([x for x in input_file])
|
||||
return io.StringIO(markdown.markdown(md, extensions=["extra", "admonition", "wikilinks"]))
|
||||
|
||||
|
||||
processor = MarkdownProcessor # pylint: disable=invalid-name
|
||||
1
heckweasel/processors/process_pp.py
Normal file
1
heckweasel/processors/process_pp.py
Normal file
@ -0,0 +1 @@
|
||||
processor = None
|
||||
1
heckweasel/processors/process_sass.py
Normal file
1
heckweasel/processors/process_sass.py
Normal file
@ -0,0 +1 @@
|
||||
processor = None
|
||||
1
heckweasel/processors/process_styl.py
Normal file
1
heckweasel/processors/process_styl.py
Normal file
@ -0,0 +1 @@
|
||||
processor = None
|
||||
69
heckweasel/processors/processors.py
Normal file
69
heckweasel/processors/processors.py
Normal file
@ -0,0 +1,69 @@
|
||||
import abc
|
||||
from typing import Dict, Iterable, Optional
|
||||
|
||||
|
||||
class PassthroughException(Exception):
|
||||
"""Raised when the processor would like the file to pass through unchanged."""
|
||||
|
||||
|
||||
class ProcessorException(Exception): # pragma: no cover
|
||||
"""A base exception class to be used by processor objects."""
|
||||
|
||||
|
||||
class Processor(abc.ABC): # pragma: no cover
|
||||
def __init__(self, *args, **kwargs):
|
||||
"""Initialize the class."""
|
||||
|
||||
@abc.abstractmethod
|
||||
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the filename of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the previous name for the file.
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new name for the file
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new mimetype of the file after processing
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
|
||||
"""Return the mimetype of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
oldname (str): the input filename
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
str: the new extension of the file after processing
|
||||
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
|
||||
"""Return an iterable object of the post-processed file.
|
||||
|
||||
Arguments:
|
||||
input_file (iterable): An input stream
|
||||
ctx (dict, optional): A context object generated from the processor configuration
|
||||
|
||||
Returns:
|
||||
iterable: The post-processed output stream
|
||||
"""
|
||||
|
||||
def repr(self) -> str:
|
||||
return self.__class__.__name__
|
||||
36
heckweasel/pygments.py
Normal file
36
heckweasel/pygments.py
Normal file
@ -0,0 +1,36 @@
|
||||
"""Map Pygments into the Template API for inclusion in outputs."""
|
||||
from typing import Optional, cast
|
||||
|
||||
import pygments
|
||||
import pygments.formatters
|
||||
import pygments.lexers
|
||||
import pygments.styles
|
||||
import pygments.util
|
||||
|
||||
|
||||
def pygments_markup_contents_html(input_text: str, file_type: str, style: Optional[str] = None) -> str:
|
||||
"""Format input string with Pygments and return HTML."""
|
||||
|
||||
if style is None:
|
||||
style = "default"
|
||||
pyst = pygments.styles.get_style_by_name(style)
|
||||
formatter = pygments.formatters.get_formatter_by_name("html", style=pyst)
|
||||
try:
|
||||
lexer = pygments.lexers.get_lexer_for_filename(file_type)
|
||||
except pygments.util.ClassNotFound:
|
||||
try:
|
||||
lexer = pygments.lexers.get_lexer_by_name(file_type)
|
||||
except pygments.util.ClassNotFound:
|
||||
lexer = pygments.lexers.get_lexer_by_mimetype(file_type)
|
||||
|
||||
return pygments.highlight(input_text, lexer, formatter)
|
||||
|
||||
|
||||
def pygments_get_css(style: Optional[str] = None) -> str:
|
||||
"""Return the CSS styles associated with a particular style definition."""
|
||||
|
||||
if style is None:
|
||||
style = "default"
|
||||
pyst = pygments.styles.get_style_by_name(style)
|
||||
formatter = pygments.formatters.get_formatter_by_name("html", style=pyst)
|
||||
return formatter.get_style_defs()
|
||||
145
heckweasel/template_tools.py
Normal file
145
heckweasel/template_tools.py
Normal file
@ -0,0 +1,145 @@
|
||||
import copy
|
||||
import datetime
|
||||
import glob
|
||||
import itertools
|
||||
import os
|
||||
from typing import Callable, Dict, Iterable, List, Union, cast, Tuple
|
||||
|
||||
import jstyleson
|
||||
|
||||
import pytz
|
||||
|
||||
from .metadata import MetaTree
|
||||
from .processchain import ProcessorChains
|
||||
from .utils import deep_merge_dicts
|
||||
|
||||
|
||||
def file_list(root: str, listcache: Dict) -> Callable:
|
||||
def get_file_list(
|
||||
path_glob: Union[str, List[str], Tuple[str]],
|
||||
*,
|
||||
sort_order: str = "ctime",
|
||||
reverse: bool = False,
|
||||
limit: int = 0) -> Iterable:
|
||||
stattable = cast(List, [])
|
||||
if isinstance(path_glob, str):
|
||||
path_glob = [path_glob]
|
||||
for pglob in path_glob:
|
||||
if pglob in listcache:
|
||||
stattable.extend(listcache[pglob])
|
||||
else:
|
||||
for fil in glob.glob(os.path.join(root, pglob)):
|
||||
if os.path.isdir(fil):
|
||||
continue
|
||||
if fil.endswith(".meta") or fil.endswith("~"):
|
||||
continue
|
||||
st = os.stat(fil)
|
||||
stattable.append(
|
||||
{
|
||||
"file_path": os.path.relpath(fil, root),
|
||||
"file_name": os.path.split(fil)[-1],
|
||||
"mtime": st.st_mtime,
|
||||
"ctime": st.st_ctime,
|
||||
"size": st.st_size,
|
||||
"ext": os.path.splitext(fil)[1],
|
||||
}
|
||||
)
|
||||
listcache[pglob] = stattable
|
||||
ret = sorted(stattable, key=lambda x: x[sort_order], reverse=reverse)
|
||||
if limit > 0:
|
||||
return itertools.islice(ret, limit)
|
||||
return ret
|
||||
|
||||
return get_file_list
|
||||
|
||||
|
||||
def file_list_hier(root: str, flist: Callable) -> Callable:
|
||||
"""Return a callable which, given a directory, will walk the directory and return the files within
|
||||
it that match the glob passed."""
|
||||
|
||||
def get_file_list_hier(path: str, glob: str, *, sort_order: str = "ctime", reverse: bool = False) -> Iterable:
|
||||
output = []
|
||||
|
||||
for pth in os.walk(os.path.join(root, path)):
|
||||
output.extend(
|
||||
flist(
|
||||
os.path.join(os.path.relpath(os.path.realpath(pth[0]), root), glob),
|
||||
sort_order=sort_order,
|
||||
reverse=reverse,
|
||||
)
|
||||
)
|
||||
|
||||
return output
|
||||
|
||||
return get_file_list_hier
|
||||
|
||||
|
||||
def file_name(root: str, metatree: MetaTree, processor_chains: ProcessorChains, namecache: Dict) -> Callable:
|
||||
def get_file_name(file_name: str) -> Dict:
|
||||
if file_name in namecache:
|
||||
return namecache[file_name]
|
||||
metadata = metatree.get_metadata(file_name)
|
||||
chain = processor_chains.get_chain_for_filename(os.path.join(root, file_name), ctx=metadata)
|
||||
namecache[file_name] = chain.output_filename
|
||||
return namecache[file_name]
|
||||
|
||||
return get_file_name
|
||||
|
||||
|
||||
def file_raw(root: str, contcache: Dict) -> Callable:
|
||||
def get_raw(file_name: str) -> str:
|
||||
if file_name in contcache:
|
||||
return contcache[file_name]
|
||||
with open(os.path.join(root, file_name), "r", encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
return get_raw
|
||||
|
||||
|
||||
def file_json(root: str) -> Callable:
|
||||
def get_json(file_name: str, parent: Dict = None) -> Dict:
|
||||
outd = {}
|
||||
if parent is not None:
|
||||
outd = copy.deepcopy(parent)
|
||||
|
||||
with open(os.path.join(root, file_name), "r", encoding="utf-8") as f:
|
||||
return deep_merge_dicts(outd, jstyleson.load(f))
|
||||
|
||||
return get_json
|
||||
|
||||
|
||||
def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable:
|
||||
def get_file_content(file_name: str) -> Iterable:
|
||||
if file_name in contcache:
|
||||
return contcache[file_name]
|
||||
metadata = metatree.get_metadata(file_name)
|
||||
chain = processor_chains.get_chain_for_filename(os.path.join(root, file_name), ctx=metadata)
|
||||
contcache[file_name] = chain.output
|
||||
return str(chain.output)
|
||||
|
||||
return get_file_content
|
||||
|
||||
|
||||
def file_metadata(metatree: MetaTree) -> Callable:
|
||||
def get_file_metadata(file_name: str) -> Dict:
|
||||
return metatree.get_metadata(file_name)
|
||||
|
||||
return get_file_metadata
|
||||
|
||||
|
||||
def time_iso8601(timezone: str) -> Callable:
|
||||
tz = pytz.timezone(timezone)
|
||||
|
||||
def get_time_iso8601(time_t: Union[int, float]) -> str:
|
||||
return datetime.datetime.fromtimestamp(time_t, tz).isoformat("T")
|
||||
|
||||
return get_time_iso8601
|
||||
|
||||
|
||||
def date_iso8601(timezone: str) -> Callable:
|
||||
tz = pytz.timezone(timezone)
|
||||
|
||||
def get_date_iso8601(time_t: Union[int, float]) -> str:
|
||||
return datetime.datetime.fromtimestamp(time_t, tz).strftime("%Y-%m-%d")
|
||||
|
||||
return get_date_iso8601
|
||||
0
heckweasel/tests/unit/__init__.py
Normal file
0
heckweasel/tests/unit/__init__.py
Normal file
6
heckweasel/tests/unit/test_processchain.py
Normal file
6
heckweasel/tests/unit/test_processchain.py
Normal file
@ -0,0 +1,6 @@
|
||||
class TestProcessChain:
|
||||
def test_process_chain(self):
|
||||
pass
|
||||
|
||||
def test_processor_chain(self):
|
||||
pass
|
||||
72
heckweasel/utils.py
Normal file
72
heckweasel/utils.py
Normal file
@ -0,0 +1,72 @@
|
||||
from typing import Dict, Optional
|
||||
import copy
|
||||
import mimetypes
|
||||
import os
|
||||
|
||||
|
||||
def merge_dicts(dict_a: Dict, dict_b: Dict) -> Dict:
|
||||
"""Merge two dictionaries (shallow).
|
||||
|
||||
Arguments:
|
||||
dict_a (dict): The dictionary to use as the base.
|
||||
dict_b (dict): The dictionary to update the values with.
|
||||
|
||||
Returns:
|
||||
dict: A new merged dictionary.
|
||||
|
||||
"""
|
||||
dict_z = dict_a.copy()
|
||||
dict_z.update(dict_b)
|
||||
return dict_z
|
||||
|
||||
|
||||
def deep_merge_dicts(dict_a: Dict, dict_b: Dict, _path=None, cpy=False) -> Dict:
|
||||
"""Merge two dictionaries (deep).
|
||||
https://stackoverflow.com/questions/7204805/how-to-merge-dictionaries-of-dictionaries/7205107#7205107
|
||||
|
||||
Arguments:
|
||||
dict_a (dict): The dictionary to use as the base.
|
||||
dict_b (dict): The dictionary to update the values with.
|
||||
_path (list): internal use.
|
||||
|
||||
Returns:
|
||||
dict: A new merged dictionary.
|
||||
|
||||
"""
|
||||
if cpy:
|
||||
dict_a = copy.deepcopy(dict_a)
|
||||
if _path is None:
|
||||
_path = []
|
||||
for key in dict_b:
|
||||
if key in dict_a:
|
||||
if isinstance(dict_a[key], dict) and isinstance(dict_b[key], dict):
|
||||
deep_merge_dicts(dict_a[key], dict_b[key], _path + [str(key)])
|
||||
elif dict_a[key] == dict_b[key]:
|
||||
pass # same leaf value
|
||||
else:
|
||||
dict_a[key] = copy.deepcopy(dict_b[key])
|
||||
else:
|
||||
dict_a[key] = dict_b[key]
|
||||
return dict_a
|
||||
|
||||
|
||||
def guess_mime(path: str) -> Optional[str]:
|
||||
"""Guess the mime type for a given path.
|
||||
|
||||
Arguments:
|
||||
root (str): the root path of the file tree
|
||||
path (str): the sub-path within the file tree
|
||||
|
||||
Returns:
|
||||
str: the guessed mime-type
|
||||
|
||||
"""
|
||||
mtypes = mimetypes.guess_type(path)
|
||||
ftype = None
|
||||
if os.path.isdir(path):
|
||||
ftype = "directory"
|
||||
elif os.access(path, os.F_OK) and mtypes[0]:
|
||||
ftype = mtypes[0]
|
||||
else:
|
||||
ftype = "application/octet-stream"
|
||||
return ftype
|
||||
Reference in New Issue
Block a user