Initial chekin post-discontinuity.

This commit is contained in:
Cassowary 2023-09-25 18:30:10 -07:00
commit 6556164879
24 changed files with 1063 additions and 0 deletions

61
.gitignore vendored Normal file
View File

@ -0,0 +1,61 @@
*.py[cod]
# C extensions
*.so
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
lib
lib64
__pycache__
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
nosetests.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Emacs git ignore
# -*- mode: gitignore; -*-
*~
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*
# Org-mode
.org-id-locations
*_archive
# flymake-mode
*_flymake.*
# eshell files
/eshell/history
/eshell/lastdir
# elpa packages
/elpa/

11
LICENSE Normal file
View File

@ -0,0 +1,11 @@
Copyright 2021 Aldercone Studio
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

56
README.md Normal file
View File

@ -0,0 +1,56 @@
# carkov #
This is a library for creating and walking simple markov chains. It is
meant for things like text generators (such as ebooks bots and word
generators) and thus is not 'mathetematically correct'. It has some
tools for doing text analysis but more are planned in the future
(stubs exist to illustrate some plans, see TODO.md).
## Command line interface ##
This library includes a command line interface to analyzing text and
then walk the chain and generate text from the analysis.
To analyze a corpus of text files, thus:
`carkov analyze mychain.chain textfile1.txt textfile2.txt ... textfileN.txt`
To walk a chain and generate text form it, thus:
`carkov chain mychain.chain -c 10`
There are two analysis modes currently supported, `english` and
`word`, which are passed to the analyze method with the `-m`
argument. `english` mode analyzes the input in a word-wise method: the
input is segmented into (English-style) sentences, each of which are
analyzed as separate chains of words. `word` segments the input into
tokens, each of which is analyzed as a series of characters
separately.
Analysis also allows a window size to be specified, so that each item
in the chain may be a fixed series of items of a specific length (for
example, the word `foo` with a window of 2, would analyze to (_, _) ->
'f', (_, f) -> o, (f, o) -> o, etc). The wider the window, the more
similar or identical to the input stream the output becomes since
there are fewer total options to follow any given token. This is
specified with the analysis command line with the `-w` argument.
## About Library ##
The library itself exposes objects and interfaces to do the same as
the command line above. A todo item on this project is to generate
documentation and examples, but looking at the contents of __main__.py
should be instructive. The library is written in such a way as to be
pretty agnostic about the items that are chained, and hypothetically
any sequential set of things could work for this. Some framework would
have to be written to support displaying these sorts of things but it
should be possible if non-textual data were desired.
The library also provides a few mechanisms for serializing a ready to
use chain for reuse in other projects. The command line makes use of
the binary serialization mechanism (which uses `msgpack`) to save
chains from the analysis step for re-use in the chain step. There is
also a mechanism which produces a python source file tthat can be
embedded in a target project so that a python project can use the
chain without having to include an extra data file. It should be noted
that this of course is extremely inefficient for large chains.

9
TODO.md Normal file
View File

@ -0,0 +1,9 @@
* Implement text filters
** implement abstractize number filter which will take any number as imput and return a NUMBER abstract.
** Implement the abstractize roman numeral filter which will take a token that looks like a roman numeral (except for I)
return a NUMBER abstract.
** implement punctuation stripper / abstractizer which will take a punctuation token and return an abstract or abort the
token.
** Implement asciifier token which will take a unicode string and return an ascii approximation.
* Implement some example code.
* Complete documentation and introductions to actually be useful to users.

3
build.sh Executable file
View File

@ -0,0 +1,3 @@
#!/bin/sh
python3 -m build

7
carkov/__init__.py Normal file
View File

@ -0,0 +1,7 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
version = '0.1.2'

171
carkov/__main__.py Normal file
View File

@ -0,0 +1,171 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
# This module provides a command line interface to doing some common operations.
#
import argparse
import enum
import pathlib
import sys
import traceback
from random import Random
from typing import cast
from .analyze.abstract import AbstractAnalyzer
from .analyze.english import English
from .analyze.words import Words
from .chain import Chain, from_analyzer
from .serializer import dump_chainer, load_chainer
from .utils import make_sent, make_word
ERROR_NO_FILE = 1
ERROR_EXISTING_OUTPUT = 2
ERROR_WRONG_ORDER = 3
ERROR_WRONG_CLASS = 4
class AnalyzeMode(enum.Enum):
english = "english"
word = "word"
def __str__(self) -> str:
return self.value
JOINERS = {"Words": make_word, "English": make_sent}
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(prog="python -mcarkov",
description=("Process a text corpus in a markov chain fashion and/or output from"
"an analysis."))
subparsers = parser.add_subparsers(dest='command')
analyze_sub = subparsers.add_parser('analyze', help="Analyze a corpus")
analyze_sub.add_argument('output', help="Output chain to specified destination", type=pathlib.Path)
analyze_sub.add_argument('input', help="The corpus to analyze", type=pathlib.Path, nargs='+')
overappend = analyze_sub.add_mutually_exclusive_group()
overappend.add_argument('-o', '--overwrite', help='Overwrite output file.', action='store_true')
overappend.add_argument('-a', '--append', help='Append output file.', action='store_true')
analyze_sub.add_argument('-w', '--window', help='Select length of analysis window', type=int, default=2)
analyze_sub.add_argument('-m',
'--mode',
help='Select analyzis mode',
type=AnalyzeMode,
choices=list(AnalyzeMode),
default=AnalyzeMode.english)
analyze_sub.add_argument('-t', '--test', help="Output a sample from the chainer generated", action='store_true')
chain_sub = subparsers.add_parser('chain', help="Output from a chainer")
chain_sub.add_argument('input', help="The chain file to load", type=pathlib.Path)
chain_sub.add_argument('-c', '--count', help="Number of chain outputs to output", type=int, default=10)
chain_sub.add_argument('-m', '--maxlen', help="Maximum length in tokens of output (0 is unlimited)", type=int, default=0)
return parser.parse_args()
def print_chainer_output(chainer: Chain, random_state: Random):
if chainer.analyzer_class in JOINERS:
print(JOINERS[cast(str, chainer.analyzer_class)](chainer.walk(random_state, True)))
else:
print(chainer.walk(random_state, True))
def command_analyze(args: argparse.Namespace) -> int:
if not any(x.exists() for x in args.input):
print("Must specify an existing file as input for the analyzer.")
return ERROR_NO_FILE
if args.output.exists() and not (args.overwrite or args.append):
print("Output file exists, pass --overwrite to overwrite or --append to add to exsiting analysis.")
return ERROR_EXISTING_OUTPUT
analyzer: AbstractAnalyzer
if args.mode == AnalyzeMode.english:
analyzer = English(args.window)
# we just dump a whole file into the english analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
analyzer.analyze(inp.read_text('utf-8'))
else:
analyzer = Words(args.window)
# we do line-by-line single word dumps into the word analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
with inp.open('r') as words:
for line in words:
analyzer.analyze(line.strip().lower())
if args.append:
# in append mode we load an existing chain file, and then run the analyzer and merge the contents.
with open(args.output, 'rb') as old:
chainer = load_chainer(old)
if chainer.order != analyzer.order:
print("Append chainer order isn't the same as the analyzer order.")
return ERROR_WRONG_ORDER
if chainer.analyzer_class != analyzer.__class__.__name__:
print("Append chainer class isn't the same as analyzer class.")
return ERROR_WRONG_CLASS
chainer.integrate(analyzer.chain_counts)
else:
chainer = from_analyzer(analyzer)
with open(args.output, 'wb') as output:
dump_chainer(chainer, output)
print(f"Wrote chainer to {args.output}")
if args.test:
r = Random()
for _ in range(0, 5):
print_chainer_output(chainer, r)
return 0
def command_chain(args: argparse.Namespace) -> int:
r = Random()
if not args.input.exists():
print("Must specify a chain file to load.")
return ERROR_NO_FILE
with args.input.open('rb') as inp:
chainer = load_chainer(inp)
if args.count < 1:
args.count = 1
for _ in range(0, args.count):
print_chainer_output(chainer, r)
return 0
def main() -> int:
args = parse_arguments()
if args.command == 'analyze':
return command_analyze(args)
elif args.command == 'chain':
return command_chain(args)
else:
print("Expect a command `analyze` or `chain`. See --help for details.")
return 1
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception:
print("Unexpected exception!")
traceback.print_exc()

54
carkov/abstracts.py Normal file
View File

@ -0,0 +1,54 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
This module provides a few utility objects, especially the Abstract object which is used for terminals
and other abstract tokens.
"""
class CarkovFilterException(BaseException):
"""
Base exception for filter stages.
"""
class Abort(CarkovFilterException):
"""
This exception is intended for a protocol by which filters can abort a particular token from being added to the
stream.
"""
class AbortSegment(CarkovFilterException):
"""
This exception is intended for a protocol by which filters can abort an entire segment if a particular token would
be rejected.
"""
class Abstract:
"""
This is used as a way to indicate abstract tokens in a stream of tokens.
"""
def __init__(self, name: str):
self.name = name
def __repr__(self) -> str:
if self == NUMBER:
return 'NUMBER'
elif self == TERMINAL:
return 'TERMINAL'
return f"carkov.abstracts.Abstract({self.name})"
NUMBER = Abstract("NUMBER")
"""A universal Number abstract."""
TERMINAL = Abstract("TERMINAL")
"""A Universal Terminal abostract."""

View File

@ -0,0 +1,5 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#

120
carkov/analyze/abstract.py Normal file
View File

@ -0,0 +1,120 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
from abc import ABC, abstractmethod
from collections import deque, defaultdict
from ..abstracts import TERMINAL
from ..utils import merge_dict
"""
This module defines the base class for analyzers which do basic statistical analysis on a corpus.
"""
class AbstractAnalyzer(ABC):
def __init__(self, order, filters=None):
"""
Initialize the analyzer.
Arguments:
order (int): Defines the window size this analyzer uses.
filters: A list of callables to apply to each token before processing.
"""
if filters is None:
filters = []
self.order = order
self.filters = filters
self.tokens = {}
self.chain_counts = {}
def analyze(self, corpus):
"""
Analyze a corpus and integrate the data into the internal state.
Arguments:
corpus (abstract): This could be any type that the class is prepared to process.
Retruns:
self.chain_counts after processing.
"""
counts = self.analyze_corpus(corpus)
merge_dict(self.chain_counts, counts)
return self.chain_counts
def analyze_corpus(self, corpus):
"""
Do the actual analysis of corpus, and return a count dictionary.
Arguments:
corpus (abstract): This could be any type that the class is prepared to process.
Returns:
(dict) a count dictionary of just this corpus
"""
segments = self.segmentize_corpus(corpus)
counts = {}
for segment in segments:
merge_dict(counts, self.analyze_segment(segment))
return counts
@abstractmethod
def segmentize_corpus(self, corpus):
"""
Convert a corpus into a series of segments.
This must be overloaded by child class.
Arguments:
corpus (abstract): This could be any type that the class is prepared to process.
Returns:
(array of abstract): An array of segments that this class is prepared to process.
"""
def analyze_segment(self, segment):
"""
Process a segment into a series of tokens.
Arguments:
segment (abstract): This could be of any type that this class is prepared to process.
Returns:
(counts dictionary): A dictionary keyed by windowed token keys with counts of each following token
"""
tokens = self.tokenize_segment(segment) + [TERMINAL]
token = deque([None] * self.order, self.order)
counts = defaultdict(lambda: defaultdict(int))
for raw_token in tokens:
raw_token = self.process_token(raw_token)
tkey = tuple(token)
counts[tkey][raw_token] += 1
token.append(raw_token)
return counts
@abstractmethod
def tokenize_segment(self, segment):
"""
Convert a segment into a series of tokens.
This must be overloaded by child class.
Arguments:
segment (abstract): This could be of any type that this class is prepared to process.
Returns:
(array of tokens): The format and type of tokens is defined by the child class.
"""
...
def process_token(self, raw_token):
for filter in self.filters:
raw_token = filter(raw_token)
return raw_token

26
carkov/analyze/english.py Normal file
View File

@ -0,0 +1,26 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
import nltk
from .abstract import AbstractAnalyzer
class English(AbstractAnalyzer):
def __init__(self, order, filters=None):
if filters is None:
filters = []
super().__init__(order, filters)
def segmentize_corpus(self, corpus):
chunks = corpus.split('\n\n')
ret = []
for chunk in chunks:
ret = ret + nltk.sent_tokenize(chunk)
return ret
def tokenize_segment(self, segment):
return list(nltk.word_tokenize(segment))

20
carkov/analyze/words.py Normal file
View File

@ -0,0 +1,20 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
from .abstract import AbstractAnalyzer
class Words(AbstractAnalyzer):
def __init__(self, order, filters=None):
if filters is None:
filters = []
super().__init__(order, filters)
def segmentize_corpus(self, corpus):
return corpus.split(' ')
def tokenize_segment(self, segment):
return list(segment)

166
carkov/chain.py Normal file
View File

@ -0,0 +1,166 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
This module defines a chainer class which can process a count dictionary from an analyzer
and provides convenience functions for walking the chain.
"""
from collections import UserDict, deque
from typing import Any, Dict, Tuple, Optional
from .abstracts import TERMINAL
from .analyze.abstract import AbstractAnalyzer
from .utils import merge_stats, convert_counts, weighted_stat_choice
ChainType = Dict[Tuple[Any], Any]
def from_analyzer(analyzer: AbstractAnalyzer):
"""
Static initializer: Return a chainer with parameters and contents based on an analyzer instance.
"""
chainer = Chain(analyzer.order, analyzer.__class__.__name__)
chainer.integrate(analyzer.chain_counts)
return chainer
class Chain(UserDict):
def __init__(self, order: int, analyzer_class: Optional[str] = None):
"""
Initialize Chain class
Arguments:
order: The window size of this chainer.
"""
self.order = order
self.data: ChainType = {}
self.start_token = (None, ) * self.order
self.analyzer_class = analyzer_class
def integrate(self, counts: ChainType):
"""
Accept a counts dictionary and merge it with local data and recalculate statistical relationships between
outcomes. The counts must be from an analyzer of the same order.
Arguments:
counts: A counts dictionary as contained in the analyzer's chain_counts
"""
for key, count in counts.items():
stat = convert_counts(count)
if key in self.data:
merge_stats(self.data[key], stat)
else:
self.data[key] = stat
self.update_stats()
def merge(self, other):
"""
Merge a separate chainer's data into this chainer. They must be of the same order.
Arguments:
other (Chain): Another chain of the same order.
"""
for key, stat in other.items():
if key in self.data:
merge_stats(self.data[key], stat)
else:
self.data[key] = stat
self.update_stats()
def update_stats(self):
"""
Update all of the statistical ratios in the chain.
"""
for token in self.data:
self.update_stat(token)
def update_stat(self, parent_token):
"""
Update one specific set of statistical ratios in the chain.
Arguments:
parent_token: A windowed token tuple which points at the part of the chain to update
"""
stat = self.data[parent_token]
total = sum([s[0] for s in stat.values()])
for it in stat.values():
it[1] = int((it[0] / total) * 100)
def add(self, parent_token, token):
"""
Add a new count to the chain.
Arguments:
parent_token: A windowed token tuple which points to the location to add the new token.
token: The token to add.
"""
if parent_token not in self.data:
self.data[parent_token] = {}
if token in self.data[parent_token]:
self.data[parent_token][token][0] += 1
self.update_stat(parent_token)
else:
self.data[parent_token][token] = [1, 100]
def select(self, parent_token, random_generator, weighted=False):
"""
Select a token from a given parent token.
Arguments:
parent_token: A windowed token tuple
random_generator: A random.Random instance
weighted (bool, default=False): Whether to do a weighted select or a random select.
Returns:
A token
"""
if parent_token not in self.data:
return None
if weighted:
return weighted_stat_choice(random_generator, self.data[parent_token])
else:
return random_generator.choice(list(self.data[parent_token].keys()))
def walk(self, random_generator, weighted=False, maximum=1000):
"""
Return a list of tokens by walking the chain.
Arguments:
random_generator: A random.Random instance
weighted: Whether to do a weighted select at each step.
maximum: THe maximum number of walks to perform.
Returns:
A list of tokens
"""
token = self.start_token
item = None
output = []
while (len(output) < maximum):
item = self.select(token, random_generator, weighted)
if item == TERMINAL:
return output
output.append(item)
token = self.next_token(token, item)
def next_token(self, parent_token, token):
"""
Given a windowed token tuple and a token, return the next windowed token tuple.
Arguments:
parent_token: A windowed token tuple
token: A token
Returns:
A windowed token tuple which would be the next step in the chain after the token.
"""
q = deque(parent_token, self.order)
q.append(token)
return tuple(q)

35
carkov/filters.py Normal file
View File

@ -0,0 +1,35 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Various filter functions that may be useful for processing certain kinds of corpora.
"""
from typing import Optional
# from unidecode import unidecode # fixme asciifying filter
# All of these filters operate on string tokens
def str_abstractize_numbers(token: str) -> Optional[str]:
"""Replace all numbers with a Number abstract."""
return None
def str_abstractize_roman(token: str) -> Optional[str]:
"""Replace roman numerals with a Number abstract."""
return None
def str_strip_punct(token: str) -> Optional[str]:
"""Remove any punctuation characters."""
return None
def str_asciify(token: str) -> Optional[str]:
"""Convert all characters to an ascii approximation."""
return None

37
carkov/pydumper.py Normal file
View File

@ -0,0 +1,37 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Serialize chain as a python structure.
"""
from io import TextIOBase
from . import version
from .chain import Chain
template = """
# serialized from version {version}
def get_chainer():
from carkov.chain import Chain
from carkov.abstracts import NUMBER, TERMINAL, Abstract
chain = Chain({order}, "{analyzer}")
chain.data = {data}
return chain
"""
def dump_chainer(chain: Chain, outfile: TextIOBase):
"""
Serialize a chainer to an open IO stream
Arguments:
chain: A Chain object
outfile: An open IO stream in text mode that will be writen to
"""
outfile.write(template.format(version=version,
order=chain.order,
analyzer=chain.analyzer_class,
data=repr(chain.data).replace(']},', ']},\n')))

77
carkov/serializer.py Normal file
View File

@ -0,0 +1,77 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Use msgpack to serialize a chainer to disk and then reread it from a serialized file.
"""
from typing import Any, BinaryIO, Dict, Tuple, cast
import msgpack
from . import version
from .abstracts import NUMBER, TERMINAL, Abstract
from .chain import Chain
def _unserialize_encode_helper(obj: Dict) -> Any:
"""
This is a helper function which handles Abstract objects for serialization.
"""
if '$$####$$' in obj:
val: Abstract
if obj['n'] == 'TERMINAL':
val = TERMINAL
elif obj['n'] == 'NUMBER':
val = NUMBER
else:
val = Abstract(obj['n'])
return val
return obj
def _serialize_encode_helper(obj: Any) -> Any:
"""
This is a helper function which handles Abstract objects for serialization.
"""
if isinstance(obj, Abstract):
obj = {'$$####$$': True, 'n': cast(Abstract, obj).name}
return obj
def load_chainer(infile: BinaryIO) -> Chain:
"""
Unserialize a chainer from an open IO stream
Arguments:
infile: An open IO stream in binary mode pointing at a messagepack stream
Returns:
a new Chain object initialized with the contents of the stream.
"""
serialdict = msgpack.unpackb(infile.read(), object_hook=_unserialize_encode_helper, raw=False, strict_map_key=False)
if serialdict['version'] != version:
import warnings
warnings.warn(f"Version mismatch while loading chain expect: [{version}] got: [{serialdict['version']}]")
chain = Chain(serialdict['order'], serialdict['analyzer_class'])
chain.data = {cast(Tuple[Any], tuple(x)): y for x, y in serialdict['data']}
return chain
def dump_chainer(chain: Chain, outfile: BinaryIO):
"""
Serialize a chainer to an open IO stream
Arguments:
chain: A Chain object
outfile: An open IO stream in binary mode that will be writen to
"""
serialdict: Dict[str, Any] = {}
serialdict['version'] = version
serialdict['order'] = chain.order
serialdict['analyzer_class'] = chain.analyzer_class
serialdict['data'] = [(k, v) for k, v in chain.items()]
outfile.write(msgpack.packb(serialdict, use_bin_type=True, default=_serialize_encode_helper))

129
carkov/utils.py Normal file
View File

@ -0,0 +1,129 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <alderconestudio@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Various utilities the chainers and analyzers use.
"""
import string
from bisect import bisect
from typing import Dict, Callable, List, Sequence
from random import Random
def merge_dict(into: Dict, outof: Dict, mergefunction: Callable = lambda x, y: x + y) -> Dict:
"""
Given two dictionries of dictionaries, merge them together by applying the mergefunction to the
values of the second level dictionary.
Arguments:
into: The dictionary that is being operated on which gets modified.
outof: The dictionary to merge into into.
mergefunction: A function applied to every value in the second level dictionries, defaults to
adding the two values together.
Returns:
into dictionary after modification.
"""
for key in outof.keys():
if key in into:
for innerkey in outof[key].keys():
if innerkey in into[key]:
into[key][innerkey] = mergefunction(into[key][innerkey], outof[key][innerkey])
else:
into[key][innerkey] = outof[key][innerkey]
else:
into[key] = outof[key]
return into
def convert_counts(ind: Dict) -> Dict:
"""
Convert counts produced by analyzers into the statistics counts used by chainers.
Arguments:
ind (dict): The second level dictionary of a counts dictionary
Returns:
dict: A copy of ind with the values updated for chainer use.
"""
out = {}
for k in ind:
out[k] = [ind[k], 0]
return out
def merge_stats(into: Dict, outof: Dict) -> Dict:
"""
Perform a merge_dict in a way safe for the statistics dictionaries used by chainers.
Arguments:
into: The dictionary to modify
outof: The dictionary to merge into into.
Returns:
into (after modification)
"""
def stats_merge_function(i, o):
out = [0, 0]
out[0] = i[0] + o[0]
out[1] = 0
return out
return merge_dict(into, outof, stats_merge_function)
def weighted_choice(random_state: Random, values: Sequence, weights: Sequence):
"""
Choose a random value in a weighted manner.
Arguments:
random_state: A random.Random instance
values: A list of values to choose from
weights: The weights that corrospond to each value
Returns:
The selected value
"""
total: float = 0
cum_weights: List[float] = []
for w in weights:
total += w
cum_weights.append(total)
x = random_state.random() * total
i = bisect(cum_weights, x)
return values[i]
def weighted_stat_choice(random_state: Random, stats: Dict):
"""
Perform a weighted choice on a stat dictionary as used in chainers.
Arguments:
random_state: A random.Random instance
stats: A stats dictionary from a chainer
"""
values = tuple(stats.keys())
weights = tuple(stats[x][1] for x in values)
return weighted_choice(random_state, values, weights)
def make_word(seq: Sequence[str]):
return "".join(seq)
def make_sent(seq: Sequence[str]) -> str:
output = ""
for item in seq:
if item in string.punctuation:
output += item
else:
output += (" " + item) if output else (item)
return output

5
clean.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/sh
rm -rv build/
rm -rv carkov.egg-info/
rm -rv dist/

3
pyproject.toml Normal file
View File

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

41
setup.cfg Normal file
View File

@ -0,0 +1,41 @@
[metadata]
name = carkov
version = 0.1.2
description = A markov chainer library
author = Aldercone Studio
author_email = alderconestudio@gmail.com
keywords = text, markov, ebooks, chainer, generator, generative
long_description = file: README.md
long_description_content_type = text/markdown
license_file = LICENSE
url=https://github.com/aldercone/carkov
license = BSD
platform = any
classifiers =
Development Status :: 3 - Alpha
Intended Audience :: Developers
License :: OSI Approved :: BSD License
Operating System :: OS Independent
Programming Language :: Python
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: Artistic Software
Topic :: Text Processing
Topic :: Text Processing :: Linguistic
[options]
packages =
carkov
carkov.analyze
zip_safe = true
install_requires =
unidecode
nltk
msgpack
[options.entry_points]
console_scripts =
carkov = carkov.__main__:main

2
setup.py Normal file
View File

@ -0,0 +1,2 @@
import setuptools
setuptools.setup()

17
tox.ini Normal file
View File

@ -0,0 +1,17 @@
[tox]
envlist = py36, py37, py38, py39
[testenv]
deps =
flake8
mypy
commands =
flake8
mypy carkov typehints
[flake8]
max-line-length = 120
max-complexity = 15

4
typehints/msgpack.pyi Normal file
View File

@ -0,0 +1,4 @@
from typing import Any, Callable, Dict
def packb(o: Any, use_bin_type: bool, default: Callable) -> bytes: ...
def unpackb(data: bytes, raw: bool, object_hook: Callable) -> Dict: ...

4
typehints/nltk.pyi Normal file
View File

@ -0,0 +1,4 @@
from typing import Iterator
def sent_tokenize(sent: str) -> Iterator[str]: ...
def word_tokenize(word: str) -> Iterator[str]: ...