mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
✨ TETR.IO 适配 v2模板
This commit is contained in:
@@ -1,17 +1,16 @@
|
||||
from asyncio import gather
|
||||
from collections import defaultdict
|
||||
from contextlib import suppress
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from hashlib import md5
|
||||
from math import ceil, floor
|
||||
from typing import ClassVar
|
||||
from typing import ClassVar, TypeVar, overload
|
||||
from urllib.parse import urlunparse
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from aiofiles import open
|
||||
from nonebot import get_driver
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.compat import type_validate_json
|
||||
from nonebot.compat import model_dump, type_validate_json
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import At
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
@@ -25,11 +24,18 @@ from sqlalchemy import select
|
||||
from zstandard import ZstdDecompressor
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.exception import FallbackError
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.metrics import TetrisMetricsProWithPPSVS, get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.schemas.base import Avatar, Ranking
|
||||
from ...utils.render.schemas.tetrio_info import Data, Info, Radar, TetraLeague, TetraLeagueHistory
|
||||
from ...utils.render.schemas.tetrio_info import User as TemplateUser
|
||||
from ...utils.render.schemas.tetrio_info import Data, Radar, TetraLeague, TetraLeagueHistory
|
||||
from ...utils.render.schemas.tetrio_info import Info as V1TemplateInfo
|
||||
from ...utils.render.schemas.tetrio_info import User as V1TemplateUser
|
||||
from ...utils.render.schemas.tetrio_info_v2 import Blitz, Sprint, Statistic, TetraLeagueStatistic, Zen
|
||||
from ...utils.render.schemas.tetrio_info_v2 import Info as V2TemplateInfo
|
||||
from ...utils.render.schemas.tetrio_info_v2 import TetraLeague as V2TemplateTetraLeague
|
||||
from ...utils.render.schemas.tetrio_info_v2 import User as V2TemplateUser
|
||||
from ...utils.screenshot import screenshot
|
||||
from ...utils.typing import Me, Number
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
@@ -38,9 +44,10 @@ from .api import Player, User, UserInfoSuccess
|
||||
from .api.models import TETRIOHistoricalData
|
||||
from .api.schemas.tetra_league import TetraLeagueSuccess
|
||||
from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague
|
||||
from .api.schemas.user_records import SoloModeRecord, SoloRecord
|
||||
from .api.schemas.user_records import SoloModeRecord, UserRecordsSuccess
|
||||
from .constant import GAME_TYPE, TR_MAX, TR_MIN
|
||||
from .model import IORank
|
||||
from .typing import Template
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
@@ -48,7 +55,13 @@ driver = get_driver()
|
||||
|
||||
|
||||
@alc.assign('TETRIO.query')
|
||||
async def _(event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
|
||||
async def _(
|
||||
event: Event,
|
||||
matcher: Matcher,
|
||||
target: At | Me,
|
||||
event_session: EventSession,
|
||||
template: Template | None = None,
|
||||
):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
@@ -67,30 +80,18 @@ async def _(event: Event, matcher: Matcher, target: At | Me, event_session: Even
|
||||
await matcher.finish('未查询到绑定信息')
|
||||
message = UniMessage(CANT_VERIFY_MESSAGE)
|
||||
player = Player(user_id=bind.game_account, trust=True)
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
sprint = user_records.data.records.sprint
|
||||
blitz = user_records.data.records.blitz
|
||||
with suppress(TypeError):
|
||||
message.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record))
|
||||
await message.finish()
|
||||
message += make_query_text(user_info, sprint, blitz)
|
||||
await message.finish()
|
||||
await (message + (await make_query_result(player, template or 'v1'))).finish()
|
||||
|
||||
|
||||
@alc.assign('TETRIO.query')
|
||||
async def _(account: Player, event_session: EventSession):
|
||||
async def _(account: Player, event_session: EventSession, template: Template | None = None):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
user, user_info, user_records = await gather(account.user, account.get_info(), account.get_records())
|
||||
sprint = user_records.data.records.sprint
|
||||
blitz = user_records.data.records.blitz
|
||||
with suppress(TypeError):
|
||||
await UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)).finish()
|
||||
await make_query_text(user_info, sprint, blitz).finish()
|
||||
await (await make_query_result(account, template or 'v1')).finish()
|
||||
|
||||
|
||||
def get_value_bounds(values: list[int | float]) -> tuple[int, int]:
|
||||
@@ -212,13 +213,40 @@ async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[
|
||||
return histories
|
||||
|
||||
|
||||
async def make_query_image(
|
||||
user: User, user_info: UserInfoSuccess, sprint: SoloRecord | None, blitz: SoloRecord | None
|
||||
) -> bytes:
|
||||
L = TypeVar('L', NeverPlayedLeague, NeverRatedLeague, RatedLeague)
|
||||
|
||||
|
||||
@overload
|
||||
def get_league(user_info: UserInfoSuccess, league_type: type[L]) -> L: ...
|
||||
@overload
|
||||
def get_league(
|
||||
user_info: UserInfoSuccess, league_type: None = None
|
||||
) -> NeverPlayedLeague | NeverRatedLeague | RatedLeague: ...
|
||||
def get_league(
|
||||
user_info: UserInfoSuccess, league_type: type[L] | None = None
|
||||
) -> L | NeverPlayedLeague | NeverRatedLeague | RatedLeague:
|
||||
league = user_info.data.user.league
|
||||
if not isinstance(league, RatedLeague) or league.vs is None:
|
||||
raise TypeError
|
||||
user_name = user_info.data.user.username.upper()
|
||||
if league_type is None:
|
||||
return league
|
||||
if isinstance(league, league_type):
|
||||
return league
|
||||
raise FallbackError
|
||||
|
||||
|
||||
def get_sprint(user_records: UserRecordsSuccess) -> SoloModeRecord:
|
||||
return user_records.data.records.sprint
|
||||
|
||||
|
||||
def get_blitz(user_records: UserRecordsSuccess) -> SoloModeRecord:
|
||||
return user_records.data.records.blitz
|
||||
|
||||
|
||||
async def make_query_image_v1(player: Player) -> bytes:
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
league = get_league(user_info, RatedLeague)
|
||||
sprint, blitz = get_sprint(user_records).record, get_blitz(user_records).record
|
||||
if league.vs is None:
|
||||
raise FallbackError
|
||||
histories = await query_historical_data(user, user_info)
|
||||
value_max, value_min = get_value_bounds([i.tr for i in histories])
|
||||
split_value, offset = get_split(value_max, value_min)
|
||||
@@ -229,17 +257,17 @@ async def make_query_image(
|
||||
sprint_value = 'N/A'
|
||||
blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A'
|
||||
async with HostPage(
|
||||
await render(
|
||||
page=await render(
|
||||
'v1/tetrio/info',
|
||||
Info(
|
||||
user=TemplateUser(
|
||||
V1TemplateInfo(
|
||||
user=V1TemplateUser(
|
||||
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
|
||||
if user_info.data.user.avatar_revision is not None
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
|
||||
),
|
||||
name=user_name,
|
||||
name=user.name.upper(),
|
||||
bio=user_info.data.user.bio,
|
||||
),
|
||||
ranking=Ranking(
|
||||
@@ -280,9 +308,118 @@ async def make_query_image(
|
||||
return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
|
||||
|
||||
|
||||
def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: SoloModeRecord) -> UniMessage:
|
||||
league = user_info.data.user.league
|
||||
user_name = user_info.data.user.username.upper()
|
||||
N = TypeVar('N', int, float)
|
||||
|
||||
|
||||
def handling_special_value(value: N) -> N | None:
|
||||
return value if value != -1 else None
|
||||
|
||||
|
||||
async def make_query_image_v2(player: Player) -> bytes:
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
league = get_league(user_info)
|
||||
sprint, blitz = get_sprint(user_records), get_blitz(user_records)
|
||||
|
||||
if sprint.record is not None:
|
||||
duration = timedelta(milliseconds=sprint.record.endcontext.final_time).total_seconds()
|
||||
sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004
|
||||
else:
|
||||
sprint_value = 'N/A'
|
||||
|
||||
play_time: str | None
|
||||
if (game_time := handling_special_value(user_info.data.user.gametime)) is not None:
|
||||
if game_time // 3600 > 0:
|
||||
play_time = f'{game_time//3600:.0f}h {game_time % 3600 // 60:.0f}m {game_time % 60:.0f}s'
|
||||
elif game_time // 60 > 0:
|
||||
play_time = f'{game_time//60:.0f}m {game_time % 60:.0f}s'
|
||||
else:
|
||||
play_time = f'{game_time:.0f}s'
|
||||
else:
|
||||
play_time = game_time
|
||||
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v2/tetrio/info',
|
||||
V2TemplateInfo(
|
||||
user=V2TemplateUser(
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
bio=user_info.data.user.bio,
|
||||
banner=f'https://tetr.io/user-content/banners/{user_info.data.user.id}.jpg?rv={user_info.data.user.banner_revision}'
|
||||
if user_info.data.user.banner_revision is not None and user_info.data.user.banner_revision != 0
|
||||
else None,
|
||||
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
|
||||
if user_info.data.user.avatar_revision is not None
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
|
||||
),
|
||||
badges=[i.id for i in user_info.data.user.badges],
|
||||
country=user_info.data.user.country,
|
||||
xp=user_info.data.user.xp,
|
||||
friend_count=user_info.data.user.friend_count or 0,
|
||||
supporter_tier=user_info.data.user.supporter_tier,
|
||||
bad_standing=user_info.data.user.badstanding or False,
|
||||
verified=user_info.data.user.verified,
|
||||
playtime=play_time,
|
||||
join_at=user_info.data.user.ts,
|
||||
),
|
||||
tetra_league=V2TemplateTetraLeague(
|
||||
rank=league.rank,
|
||||
highest_rank=league.bestrank,
|
||||
tr=round(league.rating, 2),
|
||||
glicko=round(league.glicko, 2),
|
||||
rd=round(league.rd, 2),
|
||||
global_rank=handling_special_value(league.standing),
|
||||
country_rank=handling_special_value(league.standing_local),
|
||||
pps=(
|
||||
metrics := get_metrics(pps=league.pps, apm=league.apm, vs=league.vs)
|
||||
if league.vs is not None
|
||||
else get_metrics(pps=league.pps, apm=league.apm)
|
||||
).pps,
|
||||
apm=metrics.apm,
|
||||
apl=metrics.apl,
|
||||
vs=metrics.vs if isinstance(metrics, TetrisMetricsProWithPPSVS) else None,
|
||||
adpl=metrics.adpl if isinstance(metrics, TetrisMetricsProWithPPSVS) else None,
|
||||
statistic=TetraLeagueStatistic(
|
||||
total=league.gamesplayed,
|
||||
wins=league.gameswon,
|
||||
),
|
||||
)
|
||||
if isinstance(league, RatedLeague)
|
||||
else None,
|
||||
statistic=Statistic(
|
||||
total=handling_special_value(user_info.data.user.gamesplayed),
|
||||
wins=handling_special_value(user_info.data.user.gameswon),
|
||||
),
|
||||
sprint=Sprint(
|
||||
time=sprint_value,
|
||||
global_rank=sprint.rank,
|
||||
play_at=sprint.record.ts,
|
||||
)
|
||||
if sprint.record is not None
|
||||
else None,
|
||||
blitz=Blitz(
|
||||
score=blitz.record.endcontext.score,
|
||||
global_rank=blitz.rank,
|
||||
play_at=blitz.record.ts,
|
||||
)
|
||||
if blitz.record is not None
|
||||
else None,
|
||||
zen=Zen.model_validate(model_dump(user_records.data.zen)),
|
||||
),
|
||||
),
|
||||
) as page_hash:
|
||||
return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
|
||||
|
||||
|
||||
async def make_query_text(player: Player) -> UniMessage:
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
league = get_league(user_info)
|
||||
sprint, blitz = get_sprint(user_records), get_blitz(user_records)
|
||||
|
||||
user_name = user.name.upper()
|
||||
|
||||
message = ''
|
||||
if isinstance(league, NeverPlayedLeague):
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
@@ -295,12 +432,15 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S
|
||||
else:
|
||||
message += f'{league.rank.upper()} 段用户 {user_name} {round(league.rating,2)} TR (#{league.standing})'
|
||||
message += f', 段位分 {round(league.glicko,2)}±{round(league.rd,2)}, 最近十场的数据:'
|
||||
lpm = league.pps * 24
|
||||
message += f"\nL'PM: {round(lpm, 2)} ( {league.pps} pps )"
|
||||
message += f'\nAPM: {league.apm} ( x{round(league.apm/lpm,2)} )'
|
||||
if league.vs is not None:
|
||||
adpm = league.vs * 0.6
|
||||
message += f'\nADPM: {round(adpm,2)} ( x{round(adpm/lpm,2)} ) ( {league.vs}vs )'
|
||||
metrics = (
|
||||
get_metrics(pps=league.pps, apm=league.apm, vs=league.vs)
|
||||
if league.vs is not None
|
||||
else get_metrics(pps=league.pps, apm=league.apm)
|
||||
)
|
||||
message += f"\nL'PM: {metrics.lpm} ( {metrics.pps} pps )"
|
||||
message += f'\nAPM: {metrics.apm} ( x{metrics.apl} )'
|
||||
if isinstance(metrics, TetrisMetricsProWithPPSVS):
|
||||
message += f'\nADPM: {metrics.adpm} ( x{metrics.adpl} ) ( {metrics.vs}vs )'
|
||||
if sprint.record is not None:
|
||||
message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s'
|
||||
message += f' ( #{sprint.rank} )' if sprint.rank is not None else ''
|
||||
@@ -310,6 +450,17 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S
|
||||
return UniMessage(message)
|
||||
|
||||
|
||||
async def make_query_result(player: Player, template: Template) -> UniMessage:
|
||||
try:
|
||||
if template == 'v1':
|
||||
return UniMessage.image(raw=await make_query_image_v1(player))
|
||||
if template == 'v2':
|
||||
return UniMessage.image(raw=await make_query_image_v2(player))
|
||||
except FallbackError:
|
||||
...
|
||||
return await make_query_text(player)
|
||||
|
||||
|
||||
class FullExport:
|
||||
cache: ClassVar[defaultdict[str, set[tuple[datetime, Number]]]] = defaultdict(set)
|
||||
latest_update: ClassVar[date | None] = None
|
||||
|
||||
Reference in New Issue
Block a user