From 1e5b00a280f1fea55576924750d9a8f86a29ccef Mon Sep 17 00:00:00 2001 From: shoucandanghehe Date: Tue, 6 Aug 2024 15:29:32 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20=E5=88=9D=E6=AD=A5=E9=87=8D?= =?UTF-8?q?=E6=96=B0=E9=80=82=E9=85=8D=20TETR.IO=20query?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../games/tetrio/__init__.py | 3 +- .../games/tetrio/query.py | 242 ++++++++++++++++++ 2 files changed, 244 insertions(+), 1 deletion(-) create mode 100644 nonebot_plugin_tetris_stats/games/tetrio/query.py diff --git a/nonebot_plugin_tetris_stats/games/tetrio/__init__.py b/nonebot_plugin_tetris_stats/games/tetrio/__init__.py index ba55712..4eb9649 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/__init__.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/__init__.py @@ -23,7 +23,7 @@ command = Subcommand( ) -from . import bind, config, record # noqa: E402 +from . import bind, config, query, record # noqa: E402 main_command.add(command) @@ -31,5 +31,6 @@ __all__ = [ 'alc', 'bind', 'config', + 'query', 'record', ] diff --git a/nonebot_plugin_tetris_stats/games/tetrio/query.py b/nonebot_plugin_tetris_stats/games/tetrio/query.py new file mode 100644 index 0000000..feb34a9 --- /dev/null +++ b/nonebot_plugin_tetris_stats/games/tetrio/query.py @@ -0,0 +1,242 @@ +from asyncio import gather +from datetime import datetime, timedelta, timezone +from hashlib import md5 +from typing import TYPE_CHECKING, TypeVar +from urllib.parse import urlencode + +from arclet.alconna import Arg, ArgFlag +from nepattern import parser # type: ignore[import-untyped] +from nonebot import get_driver +from nonebot.adapters import Event +from nonebot.matcher import Matcher +from nonebot_plugin_alconna import Args, At, Option, Subcommand +from nonebot_plugin_alconna.uniseg import UniMessage +from nonebot_plugin_orm import get_session +from nonebot_plugin_session import EventSession +from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped] +from nonebot_plugin_user import User as NBUser +from nonebot_plugin_user import get_user +from sqlalchemy import select + +from ...db import query_bind_info, trigger +from ...utils.host import HostPage, get_self_netloc +from ...utils.render import render +from ...utils.render.schemas.base import Avatar +from ...utils.render.schemas.tetrio.user.info_v2 import Badge, Blitz, Sprint, Statistic, Zen +from ...utils.render.schemas.tetrio.user.info_v2 import Info as V2TemplateInfo +from ...utils.render.schemas.tetrio.user.info_v2 import User as V2TemplateUser +from ...utils.screenshot import screenshot +from ...utils.typing import Me +from .. import add_block_handlers, alc +from ..constant import CANT_VERIFY_MESSAGE +from . import command, get_player +from .api import Player +from .constant import GAME_TYPE +from .models import TETRIOUserConfig +from .typing import Template + +if TYPE_CHECKING: + from .api.schemas.summaries import SoloSuccessModel, ZenSuccessModel + from .api.schemas.user import User + from .api.schemas.user_info import UserInfoSuccess + +UTC = timezone.utc + +driver = get_driver() + +command.add( + Subcommand( + 'query', + Args( + Arg( + 'target', + parser(At | Me), + notice='@想要查询的人 / 自己', + flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL], + ), + Arg( + 'account', + parser(get_player), + notice='TETR.IO 用户名 / ID', + flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL], + ), + ), + Option( + '--template', + Arg('template', parser(Template)), + alias=['-T'], + help_text='要使用的查询模板', + ), + help_text='查询 TETR.IO 游戏信息', + ), +) + +alc.shortcut( + '(?i:io)(?i:查询|查|query|stats)', + command='tstats TETR.IO query', + humanized='io查', +) +alc.shortcut( + 'fkosk', + command='tstats TETR.IO query', + arguments=['我'], + fuzzy=False, + humanized='An Easter egg!', +) + +add_block_handlers(alc.assign('TETRIO.query')) + + +@alc.assign('TETRIO.query') +async def _( # noqa: PLR0913 + user: NBUser, + event: Event, + matcher: Matcher, + target: At | Me, + event_session: EventSession, + template: Template | None = None, +): + async with trigger( + session_persist_id=await get_session_persist_id(event_session), + game_platform=GAME_TYPE, + command_type='query', + command_args=[f'--default-template {template}'] if template is not None else [], + ): + async with get_session() as session: + bind = await query_bind_info( + session=session, + user=await get_user( + event_session.platform, target.target if isinstance(target, At) else event.get_user_id() + ), + game_platform=GAME_TYPE, + ) + if template is None: + template = await session.scalar( + select(TETRIOUserConfig.query_template).where(TETRIOUserConfig.id == user.id) + ) + if bind is None: + await matcher.finish('未查询到绑定信息') + message = UniMessage(CANT_VERIFY_MESSAGE) + player = Player(user_id=bind.game_account, trust=True) + await (message + UniMessage.image(raw=await make_query_image_v2(player))).finish() + + +@alc.assign('TETRIO.query') +async def _(user: NBUser, account: Player, event_session: EventSession, template: Template | None = None): + async with trigger( + session_persist_id=await get_session_persist_id(event_session), + game_platform=GAME_TYPE, + command_type='query', + command_args=[f'--default-template {template}'] if template is not None else [], + ): + async with get_session() as session: + if template is None: + template = await session.scalar( + select(TETRIOUserConfig.query_template).where(TETRIOUserConfig.id == user.id) + ) + await (UniMessage.image(raw=await make_query_image_v2(account))).finish() + + +N = TypeVar('N', int, float) + + +def handling_special_value(value: N) -> N | None: + return value if value != -1 else None + + +async def make_query_image_v2(player: Player) -> bytes: + user: User + user_info: UserInfoSuccess + sprint: SoloSuccessModel + blitz: SoloSuccessModel + zen: ZenSuccessModel + avatar_revision: int | None + banner_revision: int | None + # [todo) 有没有什么办法能让这类型推导成功) + user, user_info, sprint, blitz, zen, avatar_revision, banner_revision = await gather( # type: ignore[assignment] + player.user, + player.get_info(), + player.sprint, + player.blitz, + player.zen, + player.avatar_revision, + player.banner_revision, + ) + + if sprint.data.record is not None: + duration = timedelta(milliseconds=sprint.data.record.results.stats.finaltime).total_seconds() + sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004 + else: + sprint_value = 'N/A' + + play_time: str | None + if (game_time := handling_special_value(user_info.data.gametime)) is not None: + if game_time // 3600 > 0: + play_time = f'{game_time//3600:.0f}h {game_time % 3600 // 60:.0f}m {game_time % 60:.0f}s' + elif game_time // 60 > 0: + play_time = f'{game_time//60:.0f}m {game_time % 60:.0f}s' + else: + play_time = f'{game_time:.0f}s' + else: + play_time = game_time + netloc = get_self_netloc() + async with HostPage( + await render( + 'v2/tetrio/user/info', + V2TemplateInfo( + user=V2TemplateUser( + id=user.ID, + name=user.name.upper(), + bio=user_info.data.bio, + banner=f'http://{netloc}/host/resource/tetrio/banners/{user.ID}?{urlencode({"revision": banner_revision})}' + if banner_revision is not None and banner_revision != 0 + else None, + avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}' + if avatar_revision is not None and avatar_revision != 0 + else Avatar( + type='identicon', + hash=md5(user.ID.encode()).hexdigest(), # noqa: S324 + ), + badges=[ + Badge( + id=i.id, + description=i.label, + group=i.group, + receive_at=i.ts if isinstance(i.ts, datetime) else None, + ) + for i in user_info.data.badges + ], + country=user_info.data.country, + role=user_info.data.role, + xp=user_info.data.xp, + friend_count=user_info.data.friend_count, + supporter_tier=user_info.data.supporter_tier, + bad_standing=user_info.data.badstanding or False, + verified=user_info.data.verified, + playtime=play_time, + join_at=user_info.data.ts, + ), + tetra_league=None, + statistic=Statistic( + total=handling_special_value(user_info.data.gamesplayed), + wins=handling_special_value(user_info.data.gameswon), + ), + sprint=Sprint( + time=sprint_value, + global_rank=sprint.data.rank, + play_at=sprint.data.record.ts, + ) + if sprint.data.record is not None + else None, + blitz=Blitz( + score=blitz.data.record.results.stats.score, + global_rank=blitz.data.rank, + play_at=blitz.data.record.ts, + ) + if blitz.data.record is not None + else None, + zen=Zen(level=zen.data.level, score=zen.data.score), + ), + ), + ) as page_hash: + return await screenshot(f'http://{netloc}/host/{page_hash}.html')