import os.path from hashlib import sha1 from peewee import IntegrityError from aiofiles.os import path from aiofiles.tempfile import NamedTemporaryFile from db import db from db import User, Video, Like, Tag, VideoTag from tools import is_valid_username, is_valid_password from tools import hash_password from tools import probe_image, create_thumbnail from tools import probe_video, create_preview from tools import parse_tags from tools import save_content from video import VideoError from video import serialize_video, find_video_by_id from config import config class LoginError(Exception): pass class SignupError(Exception): pass class UserError(Exception): pass async def find_user(username): try: return await db.get(User .select() .where(User.username == username)) except User.DoesNotExist: raise UserError('account is disabled or doesn\'t exist') async def find_user_by_id(user_id): try: user_id = int(user_id) user_id = abs(user_id) except ValueError: raise UserError('illegal user_id') try: return await db.get(User .select() .where(User.id == user_id)) except User.DoesNotExist: raise UserError('account is disabled or doesn\'t exist') async def create_user(username, password, ip_address='127.0.0.1'): try: return await db.create(User, ip_address=ip_address, username=username, password=hash_password(password)) except IntegrityError: raise UserError('this username is already taken') async def get_authorized_user(session): if 'authorized' not in session: raise UserError('not authorized') return await find_user_by_id(session['authorized']) async def log_in(session, username, password): if 'authorized' in session: raise LoginError('already logged in') if not is_valid_username(username): raise LoginError('illegal username') if not is_valid_password(password): raise LoginError('illegal password') user = await find_user(username) hash = hash_password(password) if bytes(user.password) != hash: raise LoginError('illegal credentials') session['authorized'] = user.id return user async def sign_up(session, username, password, ip_address='127.0.0.1'): if 'authorized' in session: raise SignupError('already logged in') if not is_valid_username(username): raise SignupError('illegal username') if not is_valid_password(password): raise SignupError('illegal password') return await create_user(username, password, ip_address) async def log_out(session): if 'authorized' not in session: raise UserError('not authorized') del session['authorized'] async def is_video_liked(session, video_id): user = await get_authorized_user(session) video = await find_video_by_id(video_id) try: await db.get(user .likes .select() .join(Video) .where(Video.id == video.id)) except Like.DoesNotExist: return False return True async def like_video(session, video_id): user = await get_authorized_user(session) video = await find_video_by_id(video_id) try: await db.get(user .likes .select() .join(Video) .where(Video.id == video.id)) except Like.DoesNotExist: await db.create(Like, video=video, user=user) return raise UserError('this video is already liked') async def unlike_video(session, video_id): user = await get_authorized_user(session) video = await find_video_by_id(video_id) try: await db.delete( await db.get(user .likes .select() .join(Video) .where(Video.id == video.id)) ) except Like.DoesNotExist: raise UserError('this video is not liked yet') async def upload_avatar(session, image): user = await get_authorized_user(session) if not image: raise UserError('not a valid image') async with NamedTemporaryFile('wb') as f: while True: chunk = await image.read_chunk() if not chunk: break await f.write(chunk) image_info = await probe_image(f.name) if not image_info: raise UserError('not a valid image') if image_info[1] not in config.ALLOWED_IMAGE_FORMATS: raise UserError('this image format is not allowed') width, height = map(int, image_info[2].split('x')) if width > 2048 or height > 2048: raise UserError('image size shouldn\'t exceed 2048 pixels for each dimension') await create_thumbnail(f.name, user.id, dimension=128) await create_thumbnail(f.name, user.id, dimension=64) async def upload_video(session, video, tags): user = await get_authorized_user(session) tags = parse_tags(tags) async with NamedTemporaryFile('w+b') as f: hash = sha1(video).hexdigest() await f.write(video) video_info = await probe_video(f.name) if not video_info: raise VideoError('not a valid video') format = video_info['format'] format_name = format['format_name'].split(',') if 'webm' not in format_name: raise VideoError('not a WebM video') if float(format['duration']) > 15 * 60: raise VideoError('video duration shouldn\'t exceed 15 minutes') video_stream = video_info['streams'][0] if video_stream['width'] > 2048 or video_stream['height'] > 2048: raise VideoError('video\'s resolution shouldn\'t exceed 2048px for each dimension') web_path = await save_content(f, 'videos', filename=hash) path = os.path.join('static', 'videos') path = os.path.join(path, f'{hash}.webm') path = os.path.join('.', path) thumbnail = await create_preview(path) video = await db.create(Video, video=web_path, thumbnail=thumbnail, uploader=user) for tag in tags: tag = await db.get_or_create(Tag, tag=tag) await db.create(VideoTag, video=video, tag=tag[0]) return video async def get_user_videos(user_id, offset=0): user = await find_user_by_id(user_id) videos = await db.execute(Video .select(Video, User) .join(User) .switch(Video) .where( (Video.uploader.id == user.id) & (~Video.is_hidden) ) .order_by(Video.upload_date.desc()) .offset(offset) .limit(6)) return [await serialize_video(video) for video in videos] async def get_user_videos_count(user_id): user = await find_user_by_id(user_id) return await db.count(Video .select(Video, User) .join(User) .switch(Video) .where(Video.uploader.id == user.id)) async def get_avatar(user_id, dimension=128): filename = f'{user_id}.{dimension}.png' file_path = os.path.join('static', 'avatars') file_path = os.path.join(file_path, filename) file_path = os.path.join('.', file_path) if await path.isfile(file_path): return f'/static/avatars/{filename}' return f'/static/avatars/default/default.{dimension}.png' async def serialize_user(user): return { 'avatar64': await get_avatar(user.id, dimension=64), 'avatar128': await get_avatar(user.id, dimension=128), 'signup_ts': int(user.signup_date.timestamp()), 'username': user.username }