From 18d8e0cdcc53bd86e49250c521cd6f96ce6dc1eb Mon Sep 17 00:00:00 2001 From: shoucandanghehe Date: Wed, 15 May 2024 13:34:28 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20=E5=B0=86=E6=9C=AC=E5=9C=B0?= =?UTF-8?q?=E5=AD=98=E5=82=A8=E7=9A=84=20TetraLeague=20FullExport=20?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E8=81=9A=E5=90=88=E8=BF=9B=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E5=9B=BE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io_data_processor/query.py | 106 ++++++++++++++++-- 1 file changed, 99 insertions(+), 7 deletions(-) diff --git a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/query.py b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/query.py index b573e84..63fcf0d 100644 --- a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/query.py +++ b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/query.py @@ -1,19 +1,27 @@ import contextlib from asyncio import gather -from datetime import datetime, timedelta, timezone +from collections import defaultdict +from datetime import date, datetime, timedelta, timezone from hashlib import md5 from math import ceil, floor +from typing import ClassVar from urllib.parse import urlunparse from zoneinfo import ZoneInfo +from aiofiles import open +from nonebot import get_driver from nonebot.adapters import Bot, Event +from nonebot.compat import type_validate_json from nonebot.matcher import Matcher from nonebot_plugin_alconna import At from nonebot_plugin_alconna.uniseg import UniMessage +from nonebot_plugin_apscheduler import scheduler # type: ignore[import-untyped] +from nonebot_plugin_localstore import get_data_file # type: ignore[import-untyped] from nonebot_plugin_orm import get_session from nonebot_plugin_session import EventSession # type: ignore[import-untyped] from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped] from sqlalchemy import select +from zstandard import ZstdDecompressor from ...db import query_bind_info, trigger from ...utils.host import HostPage, get_self_netloc @@ -23,17 +31,21 @@ from ...utils.render.schemas.base import Avatar from ...utils.render.schemas.tetrio_info import Data, Radar, Ranking, TetraLeague, TetraLeagueHistory from ...utils.render.schemas.tetrio_info import User as TemplateUser from ...utils.screenshot import screenshot -from ...utils.typing import Me +from ...utils.typing import Me, Number from ..constant import CANT_VERIFY_MESSAGE from . import alc from .api import Player, User, UserInfoSuccess from .api.models import TETRIOHistoricalData +from .api.schemas.tetra_league import TetraLeagueSuccess from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague from .api.schemas.user_records import SoloModeRecord, SoloRecord from .constant import GAME_TYPE, TR_MAX, TR_MIN +from .model import IORank UTC = timezone.utc +driver = get_driver() + @alc.assign('query') async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession): @@ -57,9 +69,9 @@ async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_ses user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records()) sprint = user_records.data.records.sprint blitz = user_records.data.records.blitz - with contextlib.suppress(TypeError): - message += UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)) - await message.finish() + # with contextlib.suppress(TypeError): + message += UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)) + await message.finish() message += make_query_text(user_info, sprint, blitz) await message.finish() @@ -156,7 +168,8 @@ async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[ if extra is not None: historical_data = list(historical_data) historical_data.append(extra) - if not historical_data: + full_export_data = FullExport.get_data(user.unique_identifier) + if not historical_data and not full_export_data: return [ Data(record_at=today - forward, tr=user_info.data.user.league.rating), Data(record_at=today.replace(microsecond=1000), tr=user_info.data.user.league.rating), @@ -168,7 +181,7 @@ async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[ ) for i in historical_data if isinstance(i.data, UserInfoSuccess) and isinstance(i.data.data.user.league, RatedLeague) - ] + ] + full_export_data # 按照时间排序 histories = sorted(histories, key=lambda x: x.record_at) @@ -294,3 +307,82 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S message += f'\nBlitz: {blitz.record.endcontext.score}' message += f' ( #{blitz.rank} )' if blitz.rank is not None else '' return UniMessage(message) + + +class FullExport: + cache: ClassVar[defaultdict[str, set[tuple[datetime, Number]]]] = defaultdict(set) + latest_update: ClassVar[date | None] = None + + @classmethod + async def init(cls) -> None: + async with get_session() as session: + full_exports = (await session.scalars(select(IORank).where(IORank.update_time >= cls.start_time()))).all() + await gather( + *[ + cls._load(update_time, file_hash) + for file_hash, update_time in { + i.file_hash: i.update_time for i in full_exports if i.file_hash is not None + }.items() + ] + ) + + @classmethod + async def update(cls) -> None: + if cls.latest_update == datetime.now(tz=ZoneInfo('Asia/Shanghai')).date(): + return + start_time = cls.start_time() + for i in cls.cache: + cls.cache[i] = {j for j in cls.cache[i] if j[0] >= start_time} + latest_time = max(cls.cache) + async with get_session() as session: + full_exports = (await session.scalars(select(IORank).where(IORank.update_time > latest_time))).all() + await gather( + *[ + cls._load(update_time, file_hash) + for file_hash, update_time in { + i.file_hash: i.update_time for i in full_exports if i.file_hash is not None + }.items() + ] + ) + cls.latest_update = datetime.now(tz=ZoneInfo('Asia/Shanghai')).date() + + @classmethod + def get_data(cls, unique_identifier: str) -> list[Data]: + return [Data(record_at=i[0], tr=i[1]) for i in cls.cache[unique_identifier]] + + @classmethod + def start_time(cls) -> datetime: + return ( + datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0) + - timedelta(days=9) + ).astimezone(UTC) + + @classmethod + async def _load(cls, update_time: datetime, file_hash: str) -> None: + try: + users = type_validate_json(TetraLeagueSuccess, await cls.decompress(file_hash)).data.users + except FileNotFoundError: + await cls.clear_invalid(file_hash) + return + update_time = update_time.astimezone(ZoneInfo('Asia/Shanghai')) + for i in users: + cls.cache[i.id].add((update_time, i.league.rating)) + + @classmethod + async def decompress(cls, file_hash: str) -> bytes: + async with open(get_data_file('nonebot_plugin_tetris_stats', f'{file_hash}.json.zst'), mode='rb') as file: + return ZstdDecompressor().decompress(await file.read()) + + @classmethod + async def clear_invalid(cls, file_hash: str) -> None: + async with get_session() as session: + full_exports = (await session.scalars(select(IORank).where(IORank.file_hash == file_hash))).all() + for i in full_exports: + i.file_hash = None + await session.commit() + + +@driver.on_startup +async def _(): + await FullExport.init() + scheduler.add_job(FullExport.update, 'interval', hours=1)