import os.path import re import atexit import ujson import markovify from config import config class Markov: def __init__(self): self.counter = 0 self.corpus = [] self.chain = None self.load() atexit.register(self.save) @property def is_ready(self): return self.chain is not None def generate(self): words = self.chain.walk() if not words: return self.generate() text = " ".join(words) text = re.sub(r"(?:^| )?((\?\.\.)|(\.{2,})|(\!{2,})|(\?{2,})|([.?!,:;\(\)\"'\$\+\-–—…]))(?: |$)", r"\1 ", text) text = text.strip() return text def rebuild(self): self.chain = markovify.Chain(self.corpus, config.MARKOV_STATE_SIZE).compile() self.counter = 0 def extend_corpus(self, text): text = text.strip() if not text: return text = text.replace("\n", " ") text = re.sub(r"(@[a-z0-9_]+,?)", "", text) text = re.sub("https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)", "", text) text = re.sub(r"((\?\.\.)|(\.{2,})|(\!{2,})|(\?{2,})|[.?!,:;\(\)\"'\$\+\-–—…])", r" \1 ", text) text = text.split(" ") text = map(lambda word: word.strip(), text) text = filter(bool, text) text = list(text) if text not in self.corpus: self.corpus.insert(0, text) if len(self.corpus) > config.MARKOV_CORPUS_SIZE: self.corpus = self.corpus[: config.MARKOV_CORPUS_SIZE] self.counter += 1 if self.counter % config.MARKOV_REBUILD_RATE == 0: self.rebuild() def load(self): if os.path.isfile(config.MARKOV_CHAIN_PATH): with open(config.MARKOV_CHAIN_PATH, "r") as f: self.chain = markovify.Chain.from_json(f.read()) if os.path.isfile(config.MARKOV_CORPUS_PATH): with open(config.MARKOV_CORPUS_PATH, "r") as f: self.corpus = ujson.load(f) def save(self): if self.chain: with open(config.MARKOV_CHAIN_PATH, "w") as f: f.write(self.chain.to_json()) with open(config.MARKOV_CORPUS_PATH, "w") as f: ujson.dump(self.corpus, f)