| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 | 
							- import re
 
- import asyncio
 
- import os.path
 
- from hashlib import sha1, scrypt
 
- import ujson
 
- import aiofiles
 
- from aiofiles.os import path
 
- from aiofiles.tempfile import TemporaryDirectory
 
- from aiohttp import ClientSession
 
- from config import config
 
- TAG_REGEX = re.compile(r'^[0-9a-z]+(-[0-9a-z]+)*$')
 
- USERNAME_REGEX = re.compile(r'^[0-9a-zA-Z_\-]+$')
 
- PASSWORD_REGEX = re.compile(r'^(?=.*[A-Za-z])(?=.*\d)[A-Za-z\d]{2,}$')
 
- SALT = config.SALT.encode('ASCII')
 
- async def save_content(content, category, filename=None, override=False):
 
-   if not filename:
 
-     filename = sha1(content).hexdigest()
 
-   format = 'webm' if category == 'videos' else 'png'
 
-   filename = f'{filename}.{format}'  
 
-   web_path = f'/static/{category}/{filename}' 
 
-  
 
-   file_path = os.path.join('static', category)
 
-   file_path = os.path.join(file_path, filename)
 
-   file_path = os.path.join('.', file_path)  
 
-   if not override and await path.isfile(file_path):
 
-     return web_path
 
-   async with aiofiles.open(file_path, 'wb') as f:
 
-     if type(content) is not bytes:
 
-       await content.seek(0)
 
-       while True:
 
-         chunk = await content.read(8192)
 
-         if not chunk:
 
-           break
 
-         await f.write(chunk)
 
-     else:
 
-       await f.write(content)
 
-   return web_path
 
- async def probe_video(path):
 
-   proc = await asyncio.create_subprocess_shell(
 
-     f'ffprobe -v quiet -print_format json -show_format -show_streams -select_streams v {path}',
 
-     stderr=asyncio.subprocess.PIPE,
 
-     stdout=asyncio.subprocess.PIPE
 
-   )
 
-   stdout, _ = await proc.communicate()
 
-   if proc.returncode != 0:
 
-     return None
 
-   data = ujson.loads(stdout)
 
-   
 
-   if 'format' not in data:
 
-     return None
 
-   if 'streams' not in data:
 
-     return None
 
-   if len(data['streams']) < 1:
 
-     return None
 
-   return data
 
- async def probe_image(path):
 
-   proc = await asyncio.create_subprocess_shell(
 
-     f'identify {path}',
 
-     stderr=asyncio.subprocess.PIPE,
 
-     stdout=asyncio.subprocess.PIPE
 
-   )
 
-   stdout, _ = await proc.communicate()
 
-   if proc.returncode != 0:
 
-     return None
 
-   return stdout.decode('ASCII').split(' ')
 
-   
 
- async def create_preview(path):
 
-   async with TemporaryDirectory() as output_directory:
 
-     output_path = os.path.join(output_directory, 'thumbnail.png')
 
-     proc = await asyncio.create_subprocess_shell(
 
-       f'ffmpeg -ss 00:00:01.00 -i {path} -vf \'scale=320:320:force_original_aspect_ratio=decrease\' -vframes 1 {output_path}',
 
-       stderr=asyncio.subprocess.PIPE,
 
-       stdout=asyncio.subprocess.PIPE
 
-     )
 
-     await proc.communicate()
 
-     
 
-     if proc.returncode != 0:
 
-       return '/static/thumbs/default.png'
 
-    
 
-     async with aiofiles.open(output_path, 'rb') as f:
 
-       return await save_content(await f.read(), 'thumbs')
 
-     
 
- async def create_thumbnail(path, filename, dimension=128):
 
-   async with TemporaryDirectory() as output_directory:
 
-     output_path = os.path.join(output_directory, 'thumbnail.png')
 
-     proc = await asyncio.create_subprocess_shell(
 
-       f'convert {path} -thumbnail \'{dimension}x{dimension}>\' {output_path}',
 
-       stderr=asyncio.subprocess.PIPE,
 
-       stdout=asyncio.subprocess.PIPE
 
-     )
 
-     await proc.communicate()
 
-     if proc.returncode == 0:   
 
-       async with aiofiles.open(output_path, 'rb') as f:
 
-         return await save_content(await f.read(), 'avatars', filename=f'{filename}.{dimension}', override=True)   
 
-     return f'/static/avatars/default/default.{dimension}.png'
 
- async def verify_captcha(hcaptcha_token):
 
-   async with ClientSession() as session:
 
-     async with session.post('https://hcaptcha.com/siteverify',
 
-                             data={
 
-                               'response': hcaptcha_token,
 
-                               'secret': config.HCAPTCHA_SECRET
 
-                             }) as resp:
 
-       resp = await resp.json()
 
-       return resp['success']
 
- def parse_tag(tag):
 
-   if len(tag) < 2:
 
-     raise SyntaxError('one of the tags is too short (min. length is 2 characters).')
 
-   if len(tag) > 15:
 
-     raise SyntaxError('one of the tags is too long (max. length is 15 characters).')
 
-   if not TAG_REGEX.match(tag):
 
-     raise SyntaxError('tags can only contain digits, lowercase letters and \'-\'; tags can\'t start or end with \'-\'.')
 
-   return tag
 
- def parse_tags(tags):
 
-   tags = re.sub(r'\s\s+', ' ', tags)
 
-   tags = filter(lambda tag: len(tag.strip()) > 0, tags.split(' '))
 
-   tags = map(lambda tag: tag.lower(), tags)
 
-   tags = list(tags)
 
-   if len(tags) < 1:
 
-     raise SyntaxError('video should have at least 1 tag.')
 
-   if len(tags) > 10:
 
-     raise SyntaxError('video shouldn\'t have more than 10 tags.')
 
-   for tag in tags:
 
-     parse_tag(tag)
 
-   tags = list(set(tags)) # Remove duplicates.
 
-   return tags
 
- def validate_text(text):
 
-   text = text.strip()
 
-   
 
-   if len(text) > 256:
 
-     raise ValueError('comment is too long (max. length is 256 characters)')
 
-   if len(text) < 2:
 
-     raise ValueError('comment is too short (min. length is 2 characters)')
 
-   return text
 
- def is_valid_username(username):
 
-   if not username:
 
-     return False
 
-   if len(username) > config.MAX_CREDENTIALS_LENGTH or len(username) < config.MIN_CREDENTIALS_LENGTH:
 
-     return False
 
-   if not USERNAME_REGEX.match(username):
 
-     return False
 
-   return True
 
- def is_valid_password(password):
 
-   if not password:
 
-     return False
 
-   if len(password) > config.MAX_CREDENTIALS_LENGTH or len(password) < config.MIN_CREDENTIALS_LENGTH:
 
-     return False
 
-   if not PASSWORD_REGEX.match(password):
 
-     return False
 
-   return True
 
- def hash_password(password):
 
-   password = password.encode('ASCII')
 
-   return scrypt(password, salt=SALT, n=2, r=8, p=1)
 
- def parse_offset(offset):
 
-   try:
 
-     offset = int(offset)
 
-     offset = abs(offset)
 
-   except ValueError:
 
-     offset = 0
 
-   return offset
 
 
  |