import os.path import re import atexit import string import spacy import ujson import markovify from config import config class Markov: def __init__(self): self.counter = 0 self.corpus = [] self.chain = None self.nlp = spacy.load("xx_sent_ud_sm") self.load() atexit.register(self.save) @property def is_ready(self): return self.chain is not None def generate(self, init_state=None): orig_init_state = init_state if init_state is not None: init_state = self.tokenize(init_state) init_state = tuple(init_state) size = len(init_state) if size < config.MARKOV_STATE_SIZE: init_state = (markovify.chain.BEGIN,) * (config.MARKOV_STATE_SIZE - size) + init_state words = self.chain.walk(init_state) if not words: return self.generate() text = orig_init_state if orig_init_state is not None else "" for word in words: if word in "-–—" or not all(c in string.punctuation for c in word): text += " " text += word return text.strip() def rebuild(self): self.chain = markovify.Chain(self.corpus, config.MARKOV_STATE_SIZE).compile() self.counter = 0 def tokenize(self, text): text = re.sub(r"(@[A-Za-z0-9_]+,?)", "", text) text = re.sub( "https?:\\/\\/(?:www\\.)?[-a-zA-Z0-9@:%._\\+~#=]{1,256}\\.[a-zA-Z0-9()]{1,6}\\b(?:[-a-zA-Z0-9()@:%_\\+.~#?&\\/=]*)", "", text, ) text = self.nlp(text) text = map(lambda word: str(word).strip(), text) text = filter(bool, text) return list(text) def extend_corpus(self, text): text = text.strip() if not text: return if "\n" in text: for line in text.split("\n"): self.extend_corpus(line) return text = self.tokenize(text) if text not in self.corpus: self.corpus.insert(0, text) if len(self.corpus) > config.MARKOV_CORPUS_SIZE: self.corpus.pop(-1) self.counter += 1 if self.counter % config.MARKOV_REBUILD_RATE == 0: self.rebuild() def load(self): if os.path.isfile(config.MARKOV_CHAIN_PATH): with open(config.MARKOV_CHAIN_PATH, "r") as f: self.chain = markovify.Chain.from_json(f.read()) if os.path.isfile(config.MARKOV_CORPUS_PATH): with open(config.MARKOV_CORPUS_PATH, "r") as f: self.corpus = ujson.load(f) def save(self): if self.chain: with open(config.MARKOV_CHAIN_PATH, "w") as f: f.write(self.chain.to_json()) with open(config.MARKOV_CORPUS_PATH, "w") as f: ujson.dump(self.corpus, f)