1
0

tools.py 5.6 KB


  1. import re
  2. import asyncio
  3. import os.path
  4. from hashlib import sha1, scrypt
  5. import ujson
  6. import aiofiles
  7. from aiofiles.os import path
  8. from aiofiles.tempfile import TemporaryDirectory
  9. from aiohttp import ClientSession
  10. from config import config
  11. TAG_REGEX = re.compile(r'^[0-9a-z]+(-[0-9a-z]+)*$')
  12. USERNAME_REGEX = re.compile(r'^[0-9a-zA-Z_\-]+$')
  13. PASSWORD_REGEX = re.compile(r'^(?=.*[A-Za-z])(?=.*\d)[A-Za-z\d]{2,}$')
  14. SALT = config.SALT.encode('ASCII')
  15. async def save_content(content, category, filename=None, override=False):
  16. if not filename:
  17. filename = sha1(content).hexdigest()
  18. format = 'webm' if category == 'videos' else 'png'
  19. filename = f'{filename}.{format}'
  20. web_path = f'/static/{category}/{filename}'
  21. file_path = os.path.join('static', category)
  22. file_path = os.path.join(file_path, filename)
  23. file_path = os.path.join('.', file_path)
  24. if not override and await path.isfile(file_path):
  25. return web_path
  26. async with aiofiles.open(file_path, 'wb') as f:
  27. if type(content) is not bytes:
  28. await content.seek(0)
  29. while True:
  30. chunk = await content.read(8192)
  31. if not chunk:
  32. break
  33. await f.write(chunk)
  34. else:
  35. await f.write(content)
  36. return web_path
  37. async def probe_video(path):
  38. proc = await asyncio.create_subprocess_shell(
  39. f'ffprobe -v quiet -print_format json -show_format -show_streams -select_streams v {path}',
  40. stderr=asyncio.subprocess.PIPE,
  41. stdout=asyncio.subprocess.PIPE
  42. )
  43. stdout, _ = await proc.communicate()
  44. if proc.returncode != 0:
  45. return None
  46. data = ujson.loads(stdout)
  47. if 'format' not in data:
  48. return None
  49. if 'streams' not in data:
  50. return None
  51. if len(data['streams']) < 1:
  52. return None
  53. return data
  54. async def probe_image(path):
  55. proc = await asyncio.create_subprocess_shell(
  56. f'identify {path}',
  57. stderr=asyncio.subprocess.PIPE,
  58. stdout=asyncio.subprocess.PIPE
  59. )
  60. stdout, _ = await proc.communicate()
  61. if proc.returncode != 0:
  62. return None
  63. return stdout.decode('ASCII').split(' ')
  64. async def create_preview(path):
  65. async with TemporaryDirectory() as output_directory:
  66. output_path = os.path.join(output_directory, 'thumbnail.png')
  67. proc = await asyncio.create_subprocess_shell(
  68. f'ffmpeg -ss 00:00:01.00 -i {path} -vf \'scale=320:320:force_original_aspect_ratio=decrease\' -vframes 1 {output_path}',
  69. stderr=asyncio.subprocess.PIPE,
  70. stdout=asyncio.subprocess.PIPE
  71. )
  72. await proc.communicate()
  73. if proc.returncode != 0:
  74. return '/static/thumbs/default.png'
  75. async with aiofiles.open(output_path, 'rb') as f:
  76. return await save_content(await f.read(), 'thumbs')
  77. async def create_thumbnail(path, filename, dimension=128):
  78. async with TemporaryDirectory() as output_directory:
  79. output_path = os.path.join(output_directory, 'thumbnail.png')
  80. proc = await asyncio.create_subprocess_shell(
  81. f'convert {path} -thumbnail \'{dimension}x{dimension}>\' {output_path}',
  82. stderr=asyncio.subprocess.PIPE,
  83. stdout=asyncio.subprocess.PIPE
  84. )
  85. await proc.communicate()
  86. if proc.returncode == 0:
  87. async with aiofiles.open(output_path, 'rb') as f:
  88. return await save_content(await f.read(), 'avatars', filename=f'{filename}.{dimension}', override=True)
  89. return f'/static/avatars/default/default.{dimension}.png'
  90. async def verify_captcha(hcaptcha_token):
  91. async with ClientSession() as session:
  92. async with session.post('https://hcaptcha.com/siteverify',
  93. data={
  94. 'response': hcaptcha_token,
  95. 'secret': config.HCAPTCHA_SECRET
  96. }) as resp:
  97. resp = await resp.json()
  98. return resp['success']
  99. def parse_tag(tag):
  100. if len(tag) < 2:
  101. raise SyntaxError('one of the tags is too short (min. length is 2 characters).')
  102. if len(tag) > 15:
  103. raise SyntaxError('one of the tags is too long (max. length is 15 characters).')
  104. if not TAG_REGEX.match(tag):
  105. raise SyntaxError('tags can only contain digits, lowercase letters and \'-\'; tags can\'t start or end with \'-\'.')
  106. return tag
  107. def parse_tags(tags):
  108. tags = re.sub(r'\s\s+', ' ', tags)
  109. tags = filter(lambda tag: len(tag.strip()) > 0, tags.split(' '))
  110. tags = map(lambda tag: tag.lower(), tags)
  111. tags = list(tags)
  112. if len(tags) < 1:
  113. raise SyntaxError('video should have at least 1 tag.')
  114. if len(tags) > 10:
  115. raise SyntaxError('video shouldn\'t have more than 10 tags.')
  116. for tag in tags:
  117. parse_tag(tag)
  118. tags = list(set(tags)) # Remove duplicates.
  119. return tags
  120. def validate_text(text):
  121. text = text.strip()
  122. if len(text) > 256:
  123. raise ValueError('comment is too long (max. length is 256 characters)')
  124. if len(text) < 2:
  125. raise ValueError('comment is too short (min. length is 2 characters)')
  126. return text
  127. def is_valid_username(username):
  128. if not username:
  129. return False
  130. if len(username) > config.MAX_CREDENTIALS_LENGTH or len(username) < config.MIN_CREDENTIALS_LENGTH:
  131. return False
  132. if not USERNAME_REGEX.match(username):
  133. return False
  134. return True
  135. def is_valid_password(password):
  136. if not password:
  137. return False
  138. if len(password) > config.MAX_CREDENTIALS_LENGTH or len(password) < config.MIN_CREDENTIALS_LENGTH:
  139. return False
  140. if not PASSWORD_REGEX.match(password):
  141. return False
  142. return True
  143. def hash_password(password):
  144. password = password.encode('ASCII')
  145. return scrypt(password, salt=SALT, n=2, r=8, p=1)
  146. def parse_offset(offset):
  147. try:
  148. offset = int(offset)
  149. offset = abs(offset)
  150. except ValueError:
  151. offset = 0
  152. return offset