123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- import re
- import asyncio
- import os.path
- from hashlib import sha1, scrypt
- import ujson
- import aiofiles
- from aiofiles.os import path
- from aiofiles.tempfile import TemporaryDirectory
- from aiohttp import ClientSession
- from config import config
- TAG_REGEX = re.compile(r'^[0-9a-z]+(-[0-9a-z]+)*$')
- USERNAME_REGEX = re.compile(r'^[0-9a-zA-Z_\-]+$')
- PASSWORD_REGEX = re.compile(r'^(?=.*[A-Za-z])(?=.*\d)[A-Za-z\d]{2,}$')
- SALT = config.SALT.encode('ASCII')
- async def save_content(content, category, filename=None, override=False):
- if not filename:
- filename = sha1(content).hexdigest()
- format = 'webm' if category == 'videos' else 'png'
- filename = f'{filename}.{format}'
- web_path = f'/static/{category}/{filename}'
-
- file_path = os.path.join('static', category)
- file_path = os.path.join(file_path, filename)
- file_path = os.path.join('.', file_path)
- if not override and await path.isfile(file_path):
- return web_path
- async with aiofiles.open(file_path, 'wb') as f:
- if type(content) is not bytes:
- await content.seek(0)
- while True:
- chunk = await content.read(8192)
- if not chunk:
- break
- await f.write(chunk)
- else:
- await f.write(content)
- return web_path
- async def probe_video(path):
- proc = await asyncio.create_subprocess_shell(
- f'ffprobe -v quiet -print_format json -show_format -show_streams -select_streams v {path}',
- stderr=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE
- )
- stdout, _ = await proc.communicate()
- if proc.returncode != 0:
- return None
- data = ujson.loads(stdout)
-
- if 'format' not in data:
- return None
- if 'streams' not in data:
- return None
- if len(data['streams']) < 1:
- return None
- return data
- async def probe_image(path):
- proc = await asyncio.create_subprocess_shell(
- f'identify {path}',
- stderr=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE
- )
- stdout, _ = await proc.communicate()
- if proc.returncode != 0:
- return None
- return stdout.decode('ASCII').split(' ')
-
- async def create_preview(path):
- async with TemporaryDirectory() as output_directory:
- output_path = os.path.join(output_directory, 'thumbnail.png')
- proc = await asyncio.create_subprocess_shell(
- f'ffmpeg -ss 00:00:01.00 -i {path} -vf \'scale=320:320:force_original_aspect_ratio=decrease\' -vframes 1 {output_path}',
- stderr=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE
- )
- await proc.communicate()
-
- if proc.returncode != 0:
- return '/static/thumbs/default.png'
-
- async with aiofiles.open(output_path, 'rb') as f:
- return await save_content(await f.read(), 'thumbs')
-
- async def create_thumbnail(path, filename, dimension=128):
- async with TemporaryDirectory() as output_directory:
- output_path = os.path.join(output_directory, 'thumbnail.png')
- proc = await asyncio.create_subprocess_shell(
- f'convert {path} -thumbnail \'{dimension}x{dimension}>\' {output_path}',
- stderr=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE
- )
- await proc.communicate()
- if proc.returncode == 0:
- async with aiofiles.open(output_path, 'rb') as f:
- return await save_content(await f.read(), 'avatars', filename=f'{filename}.{dimension}', override=True)
- return f'/static/avatars/default/default.{dimension}.png'
- async def verify_captcha(hcaptcha_token):
- async with ClientSession() as session:
- async with session.post('https://hcaptcha.com/siteverify',
- data={
- 'response': hcaptcha_token,
- 'secret': config.HCAPTCHA_SECRET
- }) as resp:
- resp = await resp.json()
- return resp['success']
- def parse_tag(tag):
- if len(tag) < 2:
- raise SyntaxError('one of the tags is too short (min. length is 2 characters).')
- if len(tag) > 15:
- raise SyntaxError('one of the tags is too long (max. length is 15 characters).')
- if not TAG_REGEX.match(tag):
- raise SyntaxError('tags can only contain digits, lowercase letters and \'-\'; tags can\'t start or end with \'-\'.')
- return tag
- def parse_tags(tags):
- tags = re.sub(r'\s\s+', ' ', tags)
- tags = filter(lambda tag: len(tag.strip()) > 0, tags.split(' '))
- tags = map(lambda tag: tag.lower(), tags)
- tags = list(tags)
- if len(tags) < 1:
- raise SyntaxError('video should have at least 1 tag.')
- if len(tags) > 10:
- raise SyntaxError('video shouldn\'t have more than 10 tags.')
- for tag in tags:
- parse_tag(tag)
- tags = list(set(tags)) # Remove duplicates.
- return tags
- def validate_text(text):
- text = text.strip()
-
- if len(text) > 256:
- raise ValueError('comment is too long (max. length is 256 characters)')
- if len(text) < 2:
- raise ValueError('comment is too short (min. length is 2 characters)')
- return text
- def is_valid_username(username):
- if not username:
- return False
- if len(username) > config.MAX_CREDENTIALS_LENGTH or len(username) < config.MIN_CREDENTIALS_LENGTH:
- return False
- if not USERNAME_REGEX.match(username):
- return False
- return True
- def is_valid_password(password):
- if not password:
- return False
- if len(password) > config.MAX_CREDENTIALS_LENGTH or len(password) < config.MIN_CREDENTIALS_LENGTH:
- return False
- if not PASSWORD_REGEX.match(password):
- return False
- return True
- def hash_password(password):
- password = password.encode('ASCII')
- return scrypt(password, salt=SALT, n=2, r=8, p=1)
- def parse_offset(offset):
- try:
- offset = int(offset)
- offset = abs(offset)
- except ValueError:
- offset = 0
- return offset
|