import asyncio from typing import Annotated import redis.asyncio as aioredis from aiofiles.tempfile import NamedTemporaryFile from fastapi import FastAPI, Path from fastapi.responses import Response from config import Config config = Config('config.hjson') redis = None app = FastAPI() @app.get( '/counter/{name}', responses = { 200: { 'content': {'image/png': {}} } }, response_class=Response ) async def get_counter( name: Annotated[ str, Path( min_length=1, max_length=50, pattern=r'^[a-zA-Z0-9_]+$' ) ] ): if await redis.exists(name): number = await redis.get(name) number = int(number) + 1 else: number = 1 await redis.set(name, str(number)) async with NamedTemporaryFile('rb') as f: proc = await asyncio.create_subprocess_shell( f'{config.CounterPath} {number} {f.name}' ) await proc.communicate() return Response( content=await f.read(), media_type='image/png' ) @app.on_event('startup') async def startup_event(): global redis redis = await aioredis.from_url( config.RedisURL ) @app.on_event('shutdown') async def shutdown_event(): if redis is not None: await redis.close()