123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869 |
- import struct
- import asyncio
- from typing import Annotated
- import redis.asyncio as aioredis
- from fastapi import FastAPI, Path
- from fastapi.responses import Response
- from config import Config
- config = Config('config.hjson')
- redis = None
- app = FastAPI()
- @app.get(
- '/counter/{name}',
- responses = {
- 200: {
- 'content': {'image/png': {}}
- }
- },
- response_class=Response
- )
- async def get_counter(
- name: Annotated[
- str,
- Path(
- min_length=1,
- max_length=50,
- pattern=r'^[a-zA-Z0-9А-Яа-я\-\.,_@#:]+$'
- )
- ]
- ):
- if await redis.exists(name):
- number = await redis.get(name)
- number = int(number) + 1
- else:
- number = 1
- await redis.set(name, str(number))
- proc = await asyncio.create_subprocess_exec(
- config.CounterPath,
- stdin=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE
- )
- stdout, _ = await proc.communicate(
- input=struct.pack('L', number)
- )
- return Response(
- content=stdout,
- media_type='image/png'
- )
- @app.on_event('startup')
- async def startup_event():
- global redis
- redis = await aioredis.from_url(
- config.RedisURL
- )
- @app.on_event('shutdown')
- async def shutdown_event():
- if redis is not None:
- await redis.close()
|