From 4954ab3d60974dc1bf05ad1f2e01e895d913026f Mon Sep 17 00:00:00 2001 From: shoucandanghehe Date: Mon, 10 Jun 2024 00:48:13 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20TETR.IO=20=E9=80=82=E9=85=8D=20v2?= =?UTF-8?q?=E6=A8=A1=E6=9D=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../games/tetrio/query.py | 233 +++++++++++++++--- 1 file changed, 192 insertions(+), 41 deletions(-) diff --git a/nonebot_plugin_tetris_stats/games/tetrio/query.py b/nonebot_plugin_tetris_stats/games/tetrio/query.py index 1b87e68..5180ec6 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/query.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/query.py @@ -1,17 +1,16 @@ from asyncio import gather from collections import defaultdict -from contextlib import suppress from datetime import date, datetime, timedelta, timezone from hashlib import md5 from math import ceil, floor -from typing import ClassVar +from typing import ClassVar, TypeVar, overload from urllib.parse import urlunparse from zoneinfo import ZoneInfo from aiofiles import open from nonebot import get_driver from nonebot.adapters import Event -from nonebot.compat import type_validate_json +from nonebot.compat import model_dump, type_validate_json from nonebot.matcher import Matcher from nonebot_plugin_alconna import At from nonebot_plugin_alconna.uniseg import UniMessage @@ -25,11 +24,18 @@ from sqlalchemy import select from zstandard import ZstdDecompressor from ...db import query_bind_info, trigger +from ...utils.exception import FallbackError from ...utils.host import HostPage, get_self_netloc +from ...utils.metrics import TetrisMetricsProWithPPSVS, get_metrics from ...utils.render import render from ...utils.render.schemas.base import Avatar, Ranking -from ...utils.render.schemas.tetrio_info import Data, Info, Radar, TetraLeague, TetraLeagueHistory -from ...utils.render.schemas.tetrio_info import User as TemplateUser +from ...utils.render.schemas.tetrio_info import Data, Radar, TetraLeague, TetraLeagueHistory +from ...utils.render.schemas.tetrio_info import Info as V1TemplateInfo +from ...utils.render.schemas.tetrio_info import User as V1TemplateUser +from ...utils.render.schemas.tetrio_info_v2 import Blitz, Sprint, Statistic, TetraLeagueStatistic, Zen +from ...utils.render.schemas.tetrio_info_v2 import Info as V2TemplateInfo +from ...utils.render.schemas.tetrio_info_v2 import TetraLeague as V2TemplateTetraLeague +from ...utils.render.schemas.tetrio_info_v2 import User as V2TemplateUser from ...utils.screenshot import screenshot from ...utils.typing import Me, Number from ..constant import CANT_VERIFY_MESSAGE @@ -38,9 +44,10 @@ from .api import Player, User, UserInfoSuccess from .api.models import TETRIOHistoricalData from .api.schemas.tetra_league import TetraLeagueSuccess from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague -from .api.schemas.user_records import SoloModeRecord, SoloRecord +from .api.schemas.user_records import SoloModeRecord, UserRecordsSuccess from .constant import GAME_TYPE, TR_MAX, TR_MIN from .model import IORank +from .typing import Template UTC = timezone.utc @@ -48,7 +55,13 @@ driver = get_driver() @alc.assign('TETRIO.query') -async def _(event: Event, matcher: Matcher, target: At | Me, event_session: EventSession): +async def _( + event: Event, + matcher: Matcher, + target: At | Me, + event_session: EventSession, + template: Template | None = None, +): async with trigger( session_persist_id=await get_session_persist_id(event_session), game_platform=GAME_TYPE, @@ -67,30 +80,18 @@ async def _(event: Event, matcher: Matcher, target: At | Me, event_session: Even await matcher.finish('未查询到绑定信息') message = UniMessage(CANT_VERIFY_MESSAGE) player = Player(user_id=bind.game_account, trust=True) - user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records()) - sprint = user_records.data.records.sprint - blitz = user_records.data.records.blitz - with suppress(TypeError): - message.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)) - await message.finish() - message += make_query_text(user_info, sprint, blitz) - await message.finish() + await (message + (await make_query_result(player, template or 'v1'))).finish() @alc.assign('TETRIO.query') -async def _(account: Player, event_session: EventSession): +async def _(account: Player, event_session: EventSession, template: Template | None = None): async with trigger( session_persist_id=await get_session_persist_id(event_session), game_platform=GAME_TYPE, command_type='query', command_args=[], ): - user, user_info, user_records = await gather(account.user, account.get_info(), account.get_records()) - sprint = user_records.data.records.sprint - blitz = user_records.data.records.blitz - with suppress(TypeError): - await UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)).finish() - await make_query_text(user_info, sprint, blitz).finish() + await (await make_query_result(account, template or 'v1')).finish() def get_value_bounds(values: list[int | float]) -> tuple[int, int]: @@ -212,13 +213,40 @@ async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[ return histories -async def make_query_image( - user: User, user_info: UserInfoSuccess, sprint: SoloRecord | None, blitz: SoloRecord | None -) -> bytes: +L = TypeVar('L', NeverPlayedLeague, NeverRatedLeague, RatedLeague) + + +@overload +def get_league(user_info: UserInfoSuccess, league_type: type[L]) -> L: ... +@overload +def get_league( + user_info: UserInfoSuccess, league_type: None = None +) -> NeverPlayedLeague | NeverRatedLeague | RatedLeague: ... +def get_league( + user_info: UserInfoSuccess, league_type: type[L] | None = None +) -> L | NeverPlayedLeague | NeverRatedLeague | RatedLeague: league = user_info.data.user.league - if not isinstance(league, RatedLeague) or league.vs is None: - raise TypeError - user_name = user_info.data.user.username.upper() + if league_type is None: + return league + if isinstance(league, league_type): + return league + raise FallbackError + + +def get_sprint(user_records: UserRecordsSuccess) -> SoloModeRecord: + return user_records.data.records.sprint + + +def get_blitz(user_records: UserRecordsSuccess) -> SoloModeRecord: + return user_records.data.records.blitz + + +async def make_query_image_v1(player: Player) -> bytes: + user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records()) + league = get_league(user_info, RatedLeague) + sprint, blitz = get_sprint(user_records).record, get_blitz(user_records).record + if league.vs is None: + raise FallbackError histories = await query_historical_data(user, user_info) value_max, value_min = get_value_bounds([i.tr for i in histories]) split_value, offset = get_split(value_max, value_min) @@ -229,17 +257,17 @@ async def make_query_image( sprint_value = 'N/A' blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A' async with HostPage( - await render( + page=await render( 'v1/tetrio/info', - Info( - user=TemplateUser( + V1TemplateInfo( + user=V1TemplateUser( avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}' if user_info.data.user.avatar_revision is not None else Avatar( type='identicon', hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324 ), - name=user_name, + name=user.name.upper(), bio=user_info.data.user.bio, ), ranking=Ranking( @@ -280,9 +308,118 @@ async def make_query_image( return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', ''))) -def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: SoloModeRecord) -> UniMessage: - league = user_info.data.user.league - user_name = user_info.data.user.username.upper() +N = TypeVar('N', int, float) + + +def handling_special_value(value: N) -> N | None: + return value if value != -1 else None + + +async def make_query_image_v2(player: Player) -> bytes: + user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records()) + league = get_league(user_info) + sprint, blitz = get_sprint(user_records), get_blitz(user_records) + + if sprint.record is not None: + duration = timedelta(milliseconds=sprint.record.endcontext.final_time).total_seconds() + sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004 + else: + sprint_value = 'N/A' + + play_time: str | None + if (game_time := handling_special_value(user_info.data.user.gametime)) is not None: + if game_time // 3600 > 0: + play_time = f'{game_time//3600:.0f}h {game_time % 3600 // 60:.0f}m {game_time % 60:.0f}s' + elif game_time // 60 > 0: + play_time = f'{game_time//60:.0f}m {game_time % 60:.0f}s' + else: + play_time = f'{game_time:.0f}s' + else: + play_time = game_time + + async with HostPage( + await render( + 'v2/tetrio/info', + V2TemplateInfo( + user=V2TemplateUser( + id=user.ID, + name=user.name.upper(), + bio=user_info.data.user.bio, + banner=f'https://tetr.io/user-content/banners/{user_info.data.user.id}.jpg?rv={user_info.data.user.banner_revision}' + if user_info.data.user.banner_revision is not None and user_info.data.user.banner_revision != 0 + else None, + avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}' + if user_info.data.user.avatar_revision is not None + else Avatar( + type='identicon', + hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324 + ), + badges=[i.id for i in user_info.data.user.badges], + country=user_info.data.user.country, + xp=user_info.data.user.xp, + friend_count=user_info.data.user.friend_count or 0, + supporter_tier=user_info.data.user.supporter_tier, + bad_standing=user_info.data.user.badstanding or False, + verified=user_info.data.user.verified, + playtime=play_time, + join_at=user_info.data.user.ts, + ), + tetra_league=V2TemplateTetraLeague( + rank=league.rank, + highest_rank=league.bestrank, + tr=round(league.rating, 2), + glicko=round(league.glicko, 2), + rd=round(league.rd, 2), + global_rank=handling_special_value(league.standing), + country_rank=handling_special_value(league.standing_local), + pps=( + metrics := get_metrics(pps=league.pps, apm=league.apm, vs=league.vs) + if league.vs is not None + else get_metrics(pps=league.pps, apm=league.apm) + ).pps, + apm=metrics.apm, + apl=metrics.apl, + vs=metrics.vs if isinstance(metrics, TetrisMetricsProWithPPSVS) else None, + adpl=metrics.adpl if isinstance(metrics, TetrisMetricsProWithPPSVS) else None, + statistic=TetraLeagueStatistic( + total=league.gamesplayed, + wins=league.gameswon, + ), + ) + if isinstance(league, RatedLeague) + else None, + statistic=Statistic( + total=handling_special_value(user_info.data.user.gamesplayed), + wins=handling_special_value(user_info.data.user.gameswon), + ), + sprint=Sprint( + time=sprint_value, + global_rank=sprint.rank, + play_at=sprint.record.ts, + ) + if sprint.record is not None + else None, + blitz=Blitz( + score=blitz.record.endcontext.score, + global_rank=blitz.rank, + play_at=blitz.record.ts, + ) + if blitz.record is not None + else None, + zen=Zen.model_validate(model_dump(user_records.data.zen)), + ), + ), + ) as page_hash: + return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', ''))) + + +async def make_query_text(player: Player) -> UniMessage: + user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records()) + league = get_league(user_info) + sprint, blitz = get_sprint(user_records), get_blitz(user_records) + + user_name = user.name.upper() + message = '' if isinstance(league, NeverPlayedLeague): message += f'用户 {user_name} 没有排位统计数据' @@ -295,12 +432,15 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S else: message += f'{league.rank.upper()} 段用户 {user_name} {round(league.rating,2)} TR (#{league.standing})' message += f', 段位分 {round(league.glicko,2)}±{round(league.rd,2)}, 最近十场的数据:' - lpm = league.pps * 24 - message += f"\nL'PM: {round(lpm, 2)} ( {league.pps} pps )" - message += f'\nAPM: {league.apm} ( x{round(league.apm/lpm,2)} )' - if league.vs is not None: - adpm = league.vs * 0.6 - message += f'\nADPM: {round(adpm,2)} ( x{round(adpm/lpm,2)} ) ( {league.vs}vs )' + metrics = ( + get_metrics(pps=league.pps, apm=league.apm, vs=league.vs) + if league.vs is not None + else get_metrics(pps=league.pps, apm=league.apm) + ) + message += f"\nL'PM: {metrics.lpm} ( {metrics.pps} pps )" + message += f'\nAPM: {metrics.apm} ( x{metrics.apl} )' + if isinstance(metrics, TetrisMetricsProWithPPSVS): + message += f'\nADPM: {metrics.adpm} ( x{metrics.adpl} ) ( {metrics.vs}vs )' if sprint.record is not None: message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s' message += f' ( #{sprint.rank} )' if sprint.rank is not None else '' @@ -310,6 +450,17 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S return UniMessage(message) +async def make_query_result(player: Player, template: Template) -> UniMessage: + try: + if template == 'v1': + return UniMessage.image(raw=await make_query_image_v1(player)) + if template == 'v2': + return UniMessage.image(raw=await make_query_image_v2(player)) + except FallbackError: + ... + return await make_query_text(player) + + class FullExport: cache: ClassVar[defaultdict[str, set[tuple[datetime, Number]]]] = defaultdict(set) latest_update: ClassVar[date | None] = None