Files
nonebot-plugin-tetris-stats/nonebot_plugin_tetris_stats/utils/host.py
2025-04-28 04:08:55 +08:00

112 lines
3.3 KiB
Python

from functools import cache
from hashlib import sha256
from ipaddress import IPv4Address, IPv6Address
from pathlib import Path as FilePath
from typing import TYPE_CHECKING, Annotated, ClassVar, Literal
from aiofiles import open as aopen
from fastapi import BackgroundTasks, FastAPI, Path, status
from fastapi.responses import FileResponse, HTMLResponse, Response
from fastapi.staticfiles import StaticFiles
from nonebot import get_app, get_driver
from nonebot.log import logger
from yarl import URL
from ..config.config import CACHE_PATH, config
from ..games.tetrio.api.cache import request
from .image import img_to_png
from .templates import TEMPLATES_DIR
if TYPE_CHECKING:
from pydantic import IPvAnyAddress
app: FastAPI = get_app()
driver = get_driver()
global_config = driver.config
BASE_URL = URL('https://tetr.io/user-content/')
if not isinstance(app, FastAPI):
msg = '本插件需要 FastAPI 驱动器才能运行'
raise RuntimeError(msg) # noqa: TRY004
NOT_FOUND = HTMLResponse('404 Not Found', status_code=status.HTTP_404_NOT_FOUND)
class HostPage:
pages: ClassVar[dict[str, str]] = {}
def __init__(self, page: str) -> None:
self.page_hash = sha256(page.encode()).hexdigest()
self.pages[self.page_hash] = page
async def __aenter__(self) -> str:
return self.page_hash
if not config.tetris.development:
async def __aexit__(self, exc_type, exc, tb) -> None: # noqa: ANN001
self.pages.pop(self.page_hash, None)
else:
async def __aexit__(self, exc_type, exc, tb) -> None: # noqa: ANN001
pass
@driver.on_startup
def _():
app.mount(
'/host/_nuxt',
StaticFiles(directory=TEMPLATES_DIR / '_nuxt'),
name='assets',
)
logger.success('assets mounted')
@app.get('/host/{page_hash}.html', status_code=status.HTTP_200_OK)
def _(page_hash: str) -> HTMLResponse:
if page_hash in HostPage.pages:
return HTMLResponse(HostPage.pages[page_hash])
return NOT_FOUND
@app.get('/host/resource/tetrio/{resource_type}/{user_id}', status_code=status.HTTP_200_OK)
async def _(
resource_type: Literal['avatars', 'banners'],
user_id: Annotated[str, Path(regex=r'^[a-f0-9]{24}$')],
revision: int,
background_tasks: BackgroundTasks,
) -> Response:
if not (path := CACHE_PATH / 'tetrio' / resource_type / f'{user_id}_{revision}.png').exists():
image = img_to_png(
await request.request(
BASE_URL / resource_type / f'{user_id}.jpg' % {'rv': revision},
is_json=False,
)
)
background_tasks.add_task(write_cache, path=path, data=image)
return Response(content=image, media_type='image/png')
return FileResponse(path)
async def write_cache(path: FilePath, data: bytes) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
async with aopen(path, 'wb') as file:
await file.write(data)
@cache
def get_self_netloc() -> str:
host: IPv4Address | IPv6Address | IPvAnyAddress = global_config.host
if isinstance(host, IPv4Address):
if host == IPv4Address('0.0.0.0'): # noqa: S104
host = IPv4Address('127.0.0.1')
netloc = f'{host}:{global_config.port}'
else:
if host == IPv6Address('::'):
host = IPv6Address('::1')
netloc = f'[{host}]:{global_config.port}'
return netloc