import re import asyncio import os.path from hashlib import sha1, scrypt import ujson import aiofiles from aiofiles.os import path from aiofiles.tempfile import TemporaryDirectory from aiohttp import ClientSession from config import config TAG_REGEX = re.compile(r'^[0-9a-z]+(-[0-9a-z]+)*$') USERNAME_REGEX = re.compile(r'^[0-9a-zA-Z_\-]+$') PASSWORD_REGEX = re.compile(r'^(?=.*[A-Za-z])(?=.*\d)[A-Za-z\d]{2,}$') SALT = config.SALT.encode('ASCII') async def save_content(content, category, filename=None, override=False): if not filename: filename = sha1(content).hexdigest() format = 'webm' if category == 'videos' else 'png' filename = f'{filename}.{format}' web_path = f'/static/{category}/{filename}' file_path = os.path.join('static', category) file_path = os.path.join(file_path, filename) file_path = os.path.join('.', file_path) if not override and await path.isfile(file_path): return web_path async with aiofiles.open(file_path, 'wb') as f: if type(content) is not bytes: await content.seek(0) while True: chunk = await content.read(8192) if not chunk: break await f.write(chunk) else: await f.write(content) return web_path async def probe_video(path): proc = await asyncio.create_subprocess_shell( f'ffprobe -v quiet -print_format json -show_format -show_streams -select_streams v {path}', stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE ) stdout, _ = await proc.communicate() if proc.returncode != 0: return None data = ujson.loads(stdout) if 'format' not in data: return None if 'streams' not in data: return None if len(data['streams']) < 1: return None return data async def probe_image(path): proc = await asyncio.create_subprocess_shell( f'identify {path}', stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE ) stdout, _ = await proc.communicate() if proc.returncode != 0: return None return stdout.decode('ASCII').split(' ') async def create_preview(path): async with TemporaryDirectory() as output_directory: output_path = os.path.join(output_directory, 'thumbnail.png') proc = await asyncio.create_subprocess_shell( f'ffmpeg -ss 00:00:01.00 -i {path} -vf \'scale=320:320:force_original_aspect_ratio=decrease\' -vframes 1 {output_path}', stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE ) await proc.communicate() if proc.returncode != 0: return '/static/thumbs/default.png' async with aiofiles.open(output_path, 'rb') as f: return await save_content(await f.read(), 'thumbs') async def create_thumbnail(path, filename, dimension=128): async with TemporaryDirectory() as output_directory: output_path = os.path.join(output_directory, 'thumbnail.png') proc = await asyncio.create_subprocess_shell( f'convert {path} -thumbnail \'{dimension}x{dimension}>\' {output_path}', stderr=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE ) await proc.communicate() if proc.returncode == 0: async with aiofiles.open(output_path, 'rb') as f: return await save_content(await f.read(), 'avatars', filename=f'{filename}.{dimension}', override=True) return f'/static/avatars/default/default.{dimension}.png' async def verify_captcha(hcaptcha_token): async with ClientSession() as session: async with session.post('https://hcaptcha.com/siteverify', data={ 'response': hcaptcha_token, 'secret': config.HCAPTCHA_SECRET }) as resp: resp = await resp.json() return resp['success'] def parse_tag(tag): if len(tag) < 2: raise SyntaxError('one of the tags is too short (min. length is 2 characters).') if len(tag) > 15: raise SyntaxError('one of the tags is too long (max. length is 15 characters).') if not TAG_REGEX.match(tag): raise SyntaxError('tags can only contain digits, lowercase letters and \'-\'; tags can\'t start or end with \'-\'.') return tag def parse_tags(tags): tags = re.sub(r'\s\s+', ' ', tags) tags = filter(lambda tag: len(tag.strip()) > 0, tags.split(' ')) tags = map(lambda tag: tag.lower(), tags) tags = list(tags) if len(tags) < 1: raise SyntaxError('video should have at least 1 tag.') if len(tags) > 10: raise SyntaxError('video shouldn\'t have more than 10 tags.') for tag in tags: parse_tag(tag) tags = list(set(tags)) # Remove duplicates. return tags def validate_text(text): text = text.strip() if len(text) > 256: raise ValueError('comment is too long (max. length is 256 characters)') if len(text) < 2: raise ValueError('comment is too short (min. length is 2 characters)') return text def is_valid_username(username): if not username: return False if len(username) > config.MAX_CREDENTIALS_LENGTH or len(username) < config.MIN_CREDENTIALS_LENGTH: return False if not USERNAME_REGEX.match(username): return False return True def is_valid_password(password): if not password: return False if len(password) > config.MAX_CREDENTIALS_LENGTH or len(password) < config.MIN_CREDENTIALS_LENGTH: return False if not PASSWORD_REGEX.match(password): return False return True def hash_password(password): password = password.encode('ASCII') return scrypt(password, salt=SALT, n=2, r=8, p=1) def parse_offset(offset): try: offset = int(offset) offset = abs(offset) except ValueError: offset = 0 return offset