将本地存储的 TetraLeague FullExport 数据聚合进查询图

This commit is contained in:
2024-05-15 13:34:28 +08:00
parent b37f927be6
commit 18d8e0cdcc

View File

@@ -1,19 +1,27 @@
import contextlib
from asyncio import gather
from datetime import datetime, timedelta, timezone
from collections import defaultdict
from datetime import date, datetime, timedelta, timezone
from hashlib import md5
from math import ceil, floor
from typing import ClassVar
from urllib.parse import urlunparse
from zoneinfo import ZoneInfo
from aiofiles import open
from nonebot import get_driver
from nonebot.adapters import Bot, Event
from nonebot.compat import type_validate_json
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_apscheduler import scheduler # type: ignore[import-untyped]
from nonebot_plugin_localstore import get_data_file # type: ignore[import-untyped]
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from sqlalchemy import select
from zstandard import ZstdDecompressor
from ...db import query_bind_info, trigger
from ...utils.host import HostPage, get_self_netloc
@@ -23,17 +31,21 @@ from ...utils.render.schemas.base import Avatar
from ...utils.render.schemas.tetrio_info import Data, Radar, Ranking, TetraLeague, TetraLeagueHistory
from ...utils.render.schemas.tetrio_info import User as TemplateUser
from ...utils.screenshot import screenshot
from ...utils.typing import Me
from ...utils.typing import Me, Number
from ..constant import CANT_VERIFY_MESSAGE
from . import alc
from .api import Player, User, UserInfoSuccess
from .api.models import TETRIOHistoricalData
from .api.schemas.tetra_league import TetraLeagueSuccess
from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague
from .api.schemas.user_records import SoloModeRecord, SoloRecord
from .constant import GAME_TYPE, TR_MAX, TR_MIN
from .model import IORank
UTC = timezone.utc
driver = get_driver()
@alc.assign('query')
async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
@@ -57,9 +69,9 @@ async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_ses
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
sprint = user_records.data.records.sprint
blitz = user_records.data.records.blitz
with contextlib.suppress(TypeError):
message += UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record))
await message.finish()
# with contextlib.suppress(TypeError):
message += UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record))
await message.finish()
message += make_query_text(user_info, sprint, blitz)
await message.finish()
@@ -156,7 +168,8 @@ async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[
if extra is not None:
historical_data = list(historical_data)
historical_data.append(extra)
if not historical_data:
full_export_data = FullExport.get_data(user.unique_identifier)
if not historical_data and not full_export_data:
return [
Data(record_at=today - forward, tr=user_info.data.user.league.rating),
Data(record_at=today.replace(microsecond=1000), tr=user_info.data.user.league.rating),
@@ -168,7 +181,7 @@ async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[
)
for i in historical_data
if isinstance(i.data, UserInfoSuccess) and isinstance(i.data.data.user.league, RatedLeague)
]
] + full_export_data
# 按照时间排序
histories = sorted(histories, key=lambda x: x.record_at)
@@ -294,3 +307,82 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S
message += f'\nBlitz: {blitz.record.endcontext.score}'
message += f' ( #{blitz.rank} )' if blitz.rank is not None else ''
return UniMessage(message)
class FullExport:
cache: ClassVar[defaultdict[str, set[tuple[datetime, Number]]]] = defaultdict(set)
latest_update: ClassVar[date | None] = None
@classmethod
async def init(cls) -> None:
async with get_session() as session:
full_exports = (await session.scalars(select(IORank).where(IORank.update_time >= cls.start_time()))).all()
await gather(
*[
cls._load(update_time, file_hash)
for file_hash, update_time in {
i.file_hash: i.update_time for i in full_exports if i.file_hash is not None
}.items()
]
)
@classmethod
async def update(cls) -> None:
if cls.latest_update == datetime.now(tz=ZoneInfo('Asia/Shanghai')).date():
return
start_time = cls.start_time()
for i in cls.cache:
cls.cache[i] = {j for j in cls.cache[i] if j[0] >= start_time}
latest_time = max(cls.cache)
async with get_session() as session:
full_exports = (await session.scalars(select(IORank).where(IORank.update_time > latest_time))).all()
await gather(
*[
cls._load(update_time, file_hash)
for file_hash, update_time in {
i.file_hash: i.update_time for i in full_exports if i.file_hash is not None
}.items()
]
)
cls.latest_update = datetime.now(tz=ZoneInfo('Asia/Shanghai')).date()
@classmethod
def get_data(cls, unique_identifier: str) -> list[Data]:
return [Data(record_at=i[0], tr=i[1]) for i in cls.cache[unique_identifier]]
@classmethod
def start_time(cls) -> datetime:
return (
datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
- timedelta(days=9)
).astimezone(UTC)
@classmethod
async def _load(cls, update_time: datetime, file_hash: str) -> None:
try:
users = type_validate_json(TetraLeagueSuccess, await cls.decompress(file_hash)).data.users
except FileNotFoundError:
await cls.clear_invalid(file_hash)
return
update_time = update_time.astimezone(ZoneInfo('Asia/Shanghai'))
for i in users:
cls.cache[i.id].add((update_time, i.league.rating))
@classmethod
async def decompress(cls, file_hash: str) -> bytes:
async with open(get_data_file('nonebot_plugin_tetris_stats', f'{file_hash}.json.zst'), mode='rb') as file:
return ZstdDecompressor().decompress(await file.read())
@classmethod
async def clear_invalid(cls, file_hash: str) -> None:
async with get_session() as session:
full_exports = (await session.scalars(select(IORank).where(IORank.file_hash == file_hash))).all()
for i in full_exports:
i.file_hash = None
await session.commit()
@driver.on_startup
async def _():
await FullExport.init()
scheduler.add_job(FullExport.update, 'interval', hours=1)