瀏覽代碼

Fix: 'magick' is not presented on all platforms.

txlyre 3 年之前
父節點
當前提交
da9ba68403
共有 1 個文件被更改,包括 213 次插入213 次删除
  1. 213 213
      tools.py

+ 213 - 213
tools.py

@@ -1,213 +1,213 @@
-import re
-import asyncio
-import os.path
-from hashlib import sha1, scrypt
-
-import ujson
-import aiofiles
-from aiofiles.os import path
-from aiofiles.tempfile import TemporaryDirectory
-from aiohttp import ClientSession
-
-from config import config
-
-TAG_REGEX = re.compile(r'^[0-9a-z]+(-[0-9a-z]+)*$')
-USERNAME_REGEX = re.compile(r'^[0-9a-zA-Z_\-]+$')
-PASSWORD_REGEX = re.compile(r'^(?=.*[A-Za-z])(?=.*\d)[A-Za-z\d]{2,}$')
-SALT = config.SALT.encode('ASCII')
-
-async def save_content(content, category, filename=None, override=False):
-  if not filename:
-    filename = sha1(content).hexdigest()
-
-  format = 'webm' if category == 'videos' else 'png'
-  filename = f'{filename}.{format}'  
-  web_path = f'/static/{category}/{filename}' 
- 
-  file_path = os.path.join('static', category)
-  file_path = os.path.join(file_path, filename)
-  file_path = os.path.join('.', file_path)  
-
-  if not override and await path.isfile(file_path):
-    return web_path
-
-  async with aiofiles.open(file_path, 'wb') as f:
-    if type(content) is not bytes:
-      await content.seek(0)
-
-      while True:
-        chunk = await content.read(8192)
-        if not chunk:
-          break
-
-        await f.write(chunk)
-    else:
-      await f.write(content)
-
-  return web_path
-
-async def probe_video(path):
-  proc = await asyncio.create_subprocess_shell(
-    f'ffprobe -v quiet -print_format json -show_format -show_streams -select_streams v {path}',
-    stderr=asyncio.subprocess.PIPE,
-    stdout=asyncio.subprocess.PIPE
-  )
-
-  stdout, _ = await proc.communicate()
-
-  if proc.returncode != 0:
-    return None
-
-  data = ujson.loads(stdout)
-  
-  if 'format' not in data:
-    return None
-
-  if 'streams' not in data:
-    return None
-
-  if len(data['streams']) < 1:
-    return None
-
-  return data
-
-async def probe_image(path):
-  proc = await asyncio.create_subprocess_shell(
-    f'magick identify {path}',
-    stderr=asyncio.subprocess.PIPE,
-    stdout=asyncio.subprocess.PIPE
-  )
-
-  stdout, _ = await proc.communicate()
-
-  if proc.returncode != 0:
-    return None
-
-  return stdout.decode('ASCII').split(' ')
-  
-async def create_preview(path):
-  async with TemporaryDirectory() as output_directory:
-    output_path = os.path.join(output_directory, 'thumbnail.png')
-
-    proc = await asyncio.create_subprocess_shell(
-      f'ffmpeg -ss 00:00:01.00 -i {path} -vf \'scale=320:320:force_original_aspect_ratio=decrease\' -vframes 1 {output_path}',
-      stderr=asyncio.subprocess.PIPE,
-      stdout=asyncio.subprocess.PIPE
-    )
-
-    await proc.communicate()
-    
-    if proc.returncode != 0:
-      return '/static/thumbs/default.png'
-   
-    async with aiofiles.open(output_path, 'rb') as f:
-      return await save_content(await f.read(), 'thumbs')
-    
-async def create_thumbnail(path, filename, dimension=128):
-  async with TemporaryDirectory() as output_directory:
-    output_path = os.path.join(output_directory, 'thumbnail.png')
-
-    proc = await asyncio.create_subprocess_shell(
-      f'convert {path} -thumbnail \'{dimension}x{dimension}>\' {output_path}',
-      stderr=asyncio.subprocess.PIPE,
-      stdout=asyncio.subprocess.PIPE
-    )
-
-    await proc.communicate()
-
-    if proc.returncode == 0:   
-      async with aiofiles.open(output_path, 'rb') as f:
-        return await save_content(await f.read(), 'avatars', filename=f'{filename}.{dimension}', override=True)   
-
-    return f'/static/avatars/default/default.{dimension}.png'
-
-async def verify_captcha(hcaptcha_token):
-  async with ClientSession() as session:
-    async with session.post('https://hcaptcha.com/siteverify',
-                            data={
-                              'response': hcaptcha_token,
-                              'secret': config.HCAPTCHA_SECRET
-                            }) as resp:
-      resp = await resp.json()
-
-      return resp['success']
-
-def parse_tag(tag):
-  if len(tag) < 2:
-    raise SyntaxError('one of the tags is too short (min. length is 2 characters).')
-
-  if len(tag) > 15:
-    raise SyntaxError('one of the tags is too long (max. length is 15 characters).')
-
-  if not TAG_REGEX.match(tag):
-    raise SyntaxError('tags can only contain digits, lowercase letters and \'-\'; tags can\'t start or end with \'-\'.')
-
-  return tag
-
-def parse_tags(tags):
-  tags = re.sub(r'\s\s+', ' ', tags)
-  tags = filter(lambda tag: len(tag.strip()) > 0, tags.split(' '))
-  tags = map(lambda tag: tag.lower(), tags)
-  tags = list(tags)
-
-  if len(tags) < 1:
-    raise SyntaxError('video should have at least 1 tag.')
-
-  if len(tags) > 10:
-    raise SyntaxError('video shouldn\'t have more than 10 tags.')
-
-  for tag in tags:
-    parse_tag(tag)
-
-  tags = list(set(tags)) # Remove duplicates.
-
-  return tags
-
-def validate_text(text):
-  text = text.strip()
-  
-  if len(text) > 256:
-    raise ValueError('comment is too long (max. length is 256 characters)')
-
-  if len(text) < 2:
-    raise ValueError('comment is too short (min. length is 2 characters)')
-
-  return text
-
-def is_valid_username(username):
-  if not username:
-    return False
-
-  if len(username) > config.MAX_CREDENTIALS_LENGTH or len(username) < config.MIN_CREDENTIALS_LENGTH:
-    return False
-
-  if not USERNAME_REGEX.match(username):
-    return False
-
-  return True
-
-def is_valid_password(password):
-  if not password:
-    return False
-
-  if len(password) > config.MAX_CREDENTIALS_LENGTH or len(password) < config.MIN_CREDENTIALS_LENGTH:
-    return False
-
-  if not PASSWORD_REGEX.match(password):
-    return False
-
-  return True
-
-def hash_password(password):
-  password = password.encode('ASCII')
-
-  return scrypt(password, salt=SALT, n=2, r=8, p=1)
-
-def parse_offset(offset):
-  try:
-    offset = int(offset)
-    offset = abs(offset)
-  except ValueError:
-    offset = 0
-
-  return offset
+import re
+import asyncio
+import os.path
+from hashlib import sha1, scrypt
+
+import ujson
+import aiofiles
+from aiofiles.os import path
+from aiofiles.tempfile import TemporaryDirectory
+from aiohttp import ClientSession
+
+from config import config
+
+TAG_REGEX = re.compile(r'^[0-9a-z]+(-[0-9a-z]+)*$')
+USERNAME_REGEX = re.compile(r'^[0-9a-zA-Z_\-]+$')
+PASSWORD_REGEX = re.compile(r'^(?=.*[A-Za-z])(?=.*\d)[A-Za-z\d]{2,}$')
+SALT = config.SALT.encode('ASCII')
+
+async def save_content(content, category, filename=None, override=False):
+  if not filename:
+    filename = sha1(content).hexdigest()
+
+  format = 'webm' if category == 'videos' else 'png'
+  filename = f'{filename}.{format}'  
+  web_path = f'/static/{category}/{filename}' 
+ 
+  file_path = os.path.join('static', category)
+  file_path = os.path.join(file_path, filename)
+  file_path = os.path.join('.', file_path)  
+
+  if not override and await path.isfile(file_path):
+    return web_path
+
+  async with aiofiles.open(file_path, 'wb') as f:
+    if type(content) is not bytes:
+      await content.seek(0)
+
+      while True:
+        chunk = await content.read(8192)
+        if not chunk:
+          break
+
+        await f.write(chunk)
+    else:
+      await f.write(content)
+
+  return web_path
+
+async def probe_video(path):
+  proc = await asyncio.create_subprocess_shell(
+    f'ffprobe -v quiet -print_format json -show_format -show_streams -select_streams v {path}',
+    stderr=asyncio.subprocess.PIPE,
+    stdout=asyncio.subprocess.PIPE
+  )
+
+  stdout, _ = await proc.communicate()
+
+  if proc.returncode != 0:
+    return None
+
+  data = ujson.loads(stdout)
+  
+  if 'format' not in data:
+    return None
+
+  if 'streams' not in data:
+    return None
+
+  if len(data['streams']) < 1:
+    return None
+
+  return data
+
+async def probe_image(path):
+  proc = await asyncio.create_subprocess_shell(
+    f'identify {path}',
+    stderr=asyncio.subprocess.PIPE,
+    stdout=asyncio.subprocess.PIPE
+  )
+
+  stdout, _ = await proc.communicate()
+
+  if proc.returncode != 0:
+    return None
+
+  return stdout.decode('ASCII').split(' ')
+  
+async def create_preview(path):
+  async with TemporaryDirectory() as output_directory:
+    output_path = os.path.join(output_directory, 'thumbnail.png')
+
+    proc = await asyncio.create_subprocess_shell(
+      f'ffmpeg -ss 00:00:01.00 -i {path} -vf \'scale=320:320:force_original_aspect_ratio=decrease\' -vframes 1 {output_path}',
+      stderr=asyncio.subprocess.PIPE,
+      stdout=asyncio.subprocess.PIPE
+    )
+
+    await proc.communicate()
+    
+    if proc.returncode != 0:
+      return '/static/thumbs/default.png'
+   
+    async with aiofiles.open(output_path, 'rb') as f:
+      return await save_content(await f.read(), 'thumbs')
+    
+async def create_thumbnail(path, filename, dimension=128):
+  async with TemporaryDirectory() as output_directory:
+    output_path = os.path.join(output_directory, 'thumbnail.png')
+
+    proc = await asyncio.create_subprocess_shell(
+      f'convert {path} -thumbnail \'{dimension}x{dimension}>\' {output_path}',
+      stderr=asyncio.subprocess.PIPE,
+      stdout=asyncio.subprocess.PIPE
+    )
+
+    await proc.communicate()
+
+    if proc.returncode == 0:   
+      async with aiofiles.open(output_path, 'rb') as f:
+        return await save_content(await f.read(), 'avatars', filename=f'{filename}.{dimension}', override=True)   
+
+    return f'/static/avatars/default/default.{dimension}.png'
+
+async def verify_captcha(hcaptcha_token):
+  async with ClientSession() as session:
+    async with session.post('https://hcaptcha.com/siteverify',
+                            data={
+                              'response': hcaptcha_token,
+                              'secret': config.HCAPTCHA_SECRET
+                            }) as resp:
+      resp = await resp.json()
+
+      return resp['success']
+
+def parse_tag(tag):
+  if len(tag) < 2:
+    raise SyntaxError('one of the tags is too short (min. length is 2 characters).')
+
+  if len(tag) > 15:
+    raise SyntaxError('one of the tags is too long (max. length is 15 characters).')
+
+  if not TAG_REGEX.match(tag):
+    raise SyntaxError('tags can only contain digits, lowercase letters and \'-\'; tags can\'t start or end with \'-\'.')
+
+  return tag
+
+def parse_tags(tags):
+  tags = re.sub(r'\s\s+', ' ', tags)
+  tags = filter(lambda tag: len(tag.strip()) > 0, tags.split(' '))
+  tags = map(lambda tag: tag.lower(), tags)
+  tags = list(tags)
+
+  if len(tags) < 1:
+    raise SyntaxError('video should have at least 1 tag.')
+
+  if len(tags) > 10:
+    raise SyntaxError('video shouldn\'t have more than 10 tags.')
+
+  for tag in tags:
+    parse_tag(tag)
+
+  tags = list(set(tags)) # Remove duplicates.
+
+  return tags
+
+def validate_text(text):
+  text = text.strip()
+  
+  if len(text) > 256:
+    raise ValueError('comment is too long (max. length is 256 characters)')
+
+  if len(text) < 2:
+    raise ValueError('comment is too short (min. length is 2 characters)')
+
+  return text
+
+def is_valid_username(username):
+  if not username:
+    return False
+
+  if len(username) > config.MAX_CREDENTIALS_LENGTH or len(username) < config.MIN_CREDENTIALS_LENGTH:
+    return False
+
+  if not USERNAME_REGEX.match(username):
+    return False
+
+  return True
+
+def is_valid_password(password):
+  if not password:
+    return False
+
+  if len(password) > config.MAX_CREDENTIALS_LENGTH or len(password) < config.MIN_CREDENTIALS_LENGTH:
+    return False
+
+  if not PASSWORD_REGEX.match(password):
+    return False
+
+  return True
+
+def hash_password(password):
+  password = password.encode('ASCII')
+
+  return scrypt(password, salt=SALT, n=2, r=8, p=1)
+
+def parse_offset(offset):
+  try:
+    offset = int(offset)
+    offset = abs(offset)
+  except ValueError:
+    offset = 0
+
+  return offset