🎨 重命名一些模块

This commit is contained in:
2024-05-16 05:59:18 +08:00
parent e8539c15cc
commit 3ef7605e11
42 changed files with 6 additions and 10 deletions

View File

@@ -0,0 +1,40 @@
from typing import Any
from nonebot.adapters import Bot
from nonebot.exception import FinishedException
from nonebot.matcher import Matcher
from nonebot.message import run_postprocessor
from nonebot_plugin_alconna import AlcMatches, AlconnaMatcher, At
from ..utils.exception import MessageFormatError, NeedCatchError
def add_default_handlers(matcher: type[AlconnaMatcher]) -> None:
@matcher.assign('query')
async def _(bot: Bot, matcher: Matcher, target: At):
if isinstance(target, At) and target.target == bot.self_id:
await matcher.finish('不能查询bot的信息')
@matcher.handle()
async def _(matcher: Matcher, account: MessageFormatError):
await matcher.finish(str(account))
@matcher.handle()
async def _(matcher: Matcher, matches: AlcMatches):
if matches.head_matched and matches.options != {} or matches.main_args == {}:
await matcher.finish(
(f'{matches.error_info!r}\n' if matches.error_info is not None else '')
+ f'输入"{matches.header_result} --help"查看帮助'
)
@matcher.handle()
def _(other: Any): # noqa: ANN401, ARG001
raise FinishedException
from . import tetrio, top, tos # noqa: F401, E402
@run_postprocessor
async def _(matcher: Matcher, exception: NeedCatchError):
await matcher.send(str(exception))

View File

@@ -0,0 +1,3 @@
BIND_COMMAND: list[str] = ['绑定', 'bind']
QUERY_COMMAND: list[str] = ['', '查询', 'query', 'stats']
CANT_VERIFY_MESSAGE = '* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n'

View File

@@ -0,0 +1,23 @@
from abc import ABC, abstractmethod
from pydantic import BaseModel
from ..utils.typing import GameType
class Base(BaseModel):
platform: GameType
class BaseUser(ABC, Base):
"""游戏用户"""
def __eq__(self, __value: object) -> bool:
if isinstance(__value, BaseUser):
return self.unique_identifier == __value.unique_identifier
return False
@property
@abstractmethod
def unique_identifier(self) -> str:
raise NotImplementedError

View File

@@ -0,0 +1,85 @@
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
from nonebot_plugin_alconna import At, on_alconna
from ...utils.exception import MessageFormatError
from ...utils.typing import Me
from .. import add_default_handlers
from ..constant import BIND_COMMAND, QUERY_COMMAND
from .api import Player
from .api.typing import Rank
from .constant import USER_ID, USER_NAME
def get_player(user_id_or_name: str) -> Player | MessageFormatError:
if USER_ID.match(user_id_or_name):
return Player(user_id=user_id_or_name, trust=True)
if USER_NAME.match(user_id_or_name):
return Player(user_name=user_id_or_name, trust=True)
return MessageFormatError('用户名/ID不合法')
alc = on_alconna(
Alconna(
'io',
Option(
BIND_COMMAND[0],
Args(
Arg(
'account',
get_player,
notice='IO 用户名 / ID',
flags=[ArgFlag.HIDDEN],
)
),
alias=BIND_COMMAND[1:],
compact=True,
dest='bind',
help_text='绑定 IO 账号',
),
Option(
QUERY_COMMAND[0],
Args(
Arg(
'target',
At | Me,
notice='@想要查询的人 | 自己',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
Arg(
'account',
get_player,
notice='IO 用户名 / ID',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
),
alias=QUERY_COMMAND[1:],
compact=True,
dest='query',
help_text='查询 IO 游戏信息',
),
Option(
'rank',
Args(Arg('rank', Rank, notice='IO 段位')),
alias={'Rank', 'RANK', '段位'},
compact=True,
dest='rank',
help_text='查询 IO 段位信息',
),
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
meta=CommandMeta(
description='查询 TETR.IO 的信息',
example='io绑定scdhh\nio查我\niorankx',
compact=True,
fuzzy_match=True,
),
),
skip_for_unmatch=False,
auto_send_output=True,
aliases={'IO'},
)
alc.shortcut('fkosk', {'command': 'io查', 'args': [''], 'fuzzy': False, 'humanized': 'An Easter egg!'})
from . import bind, query, rank # noqa: F401, E402
add_default_handlers(alc)

View File

@@ -0,0 +1,7 @@
from .player import Player
from .schemas.user import User
from .schemas.user_info import UserInfoSuccess
from .schemas.user_records import UserRecordsSuccess
from .tetra_league import full_export as tetra_league_full_export
__all__ = ['Player', 'User', 'UserInfoSuccess', 'UserRecordsSuccess', 'tetra_league_full_export']

View File

@@ -0,0 +1,35 @@
from asyncio import Lock
from datetime import datetime, timezone
from typing import ClassVar
from weakref import WeakValueDictionary
from aiocache import Cache as ACache # type: ignore[import-untyped]
from nonebot.compat import type_validate_json
from nonebot.log import logger
from ....utils.request import Request
from .schemas.base import FailedModel, SuccessModel
UTC = timezone.utc
class Cache:
cache = ACache(ACache.MEMORY)
task: ClassVar[WeakValueDictionary[str, Lock]] = WeakValueDictionary()
@classmethod
async def get(cls, url: str) -> bytes:
lock = cls.task.setdefault(url, Lock())
async with lock:
if (cached_data := await cls.cache.get(url)) is not None:
logger.debug(f'{url}: Cache hit!')
return cached_data
response_data = await Request.request(url)
parsed_data: SuccessModel | FailedModel = type_validate_json(SuccessModel | FailedModel, response_data) # type: ignore[arg-type]
if isinstance(parsed_data, SuccessModel):
await cls.cache.add(
url,
response_data,
(parsed_data.cache.cached_until - datetime.now(UTC)).total_seconds(),
)
return response_data

View File

@@ -0,0 +1,17 @@
from datetime import datetime
from typing import Literal
from nonebot_plugin_orm import Model
from sqlalchemy import DateTime, String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from ....db.models import PydanticType
from .schemas.base import SuccessModel
class TETRIOHistoricalData(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
user_unique_identifier: Mapped[str] = mapped_column(String(24), index=True)
api_type: Mapped[Literal['User Info', 'User Records']] = mapped_column(String(16), index=True)
data: Mapped[SuccessModel] = mapped_column(PydanticType(get_model=[SuccessModel.__subclasses__], models=set()))
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)

View File

@@ -0,0 +1,102 @@
from typing import overload
from nonebot.compat import type_validate_json
from ....db import anti_duplicate_add
from ....utils.exception import RequestError
from ....utils.request import splice_url
from ..constant import BASE_URL, USER_ID, USER_NAME
from .cache import Cache
from .models import TETRIOHistoricalData
from .schemas.base import FailedModel
from .schemas.user import User
from .schemas.user_info import UserInfo, UserInfoSuccess
from .schemas.user_records import UserRecords, UserRecordsSuccess
class Player:
@overload
def __init__(self, *, user_id: str, trust: bool = False): ...
@overload
def __init__(self, *, user_name: str, trust: bool = False): ...
def __init__(self, *, user_id: str | None = None, user_name: str | None = None, trust: bool = False):
self.user_id = user_id
self.user_name = user_name
if not trust:
if self.user_id is not None:
if not USER_ID.match(self.user_id):
msg = 'Invalid user id'
raise ValueError(msg)
elif self.user_name is not None:
if not USER_NAME.match(self.user_name):
msg = 'Invalid user name'
raise ValueError(msg)
else:
msg = 'Invalid user'
raise ValueError(msg)
self.__user: User | None = None
self._user_info: UserInfoSuccess | None = None
self._user_records: UserRecordsSuccess | None = None
@property
def _request_user_parameter(self) -> str:
if self.user_id is not None:
return self.user_id
if self.user_name is not None:
return self.user_name.lower()
msg = 'Invalid user'
raise ValueError(msg)
@property
async def user(self) -> User:
if self.__user is None:
user_info = await self.get_info()
self.__user = User(
ID=user_info.data.user.id,
name=user_info.data.user.username,
)
self.user_id = user_info.data.user.id
self.user_name = user_info.data.user.username
return self.__user
async def get_info(self) -> UserInfoSuccess:
"""Get User Info"""
if self._user_info is None:
raw_user_info = await Cache.get(splice_url([BASE_URL, 'users/', f'{self._request_user_parameter}']))
user_info: UserInfo = type_validate_json(UserInfo, raw_user_info) # type: ignore[arg-type]
if isinstance(user_info, FailedModel):
msg = f'用户信息请求错误:\n{user_info.error}'
raise RequestError(msg)
self._user_info = user_info
await anti_duplicate_add(
TETRIOHistoricalData,
TETRIOHistoricalData(
user_unique_identifier=(await self.user).unique_identifier,
api_type='User Info',
data=user_info,
update_time=user_info.cache.cached_at,
),
)
return self._user_info
async def get_records(self) -> UserRecordsSuccess:
"""Get User Records"""
if self._user_records is None:
raw_user_records = await Cache.get(
splice_url([BASE_URL, 'users/', f'{self._request_user_parameter}/', 'records'])
)
user_records: UserRecords = type_validate_json(UserRecords, raw_user_records) # type: ignore[arg-type]
if isinstance(user_records, FailedModel):
msg = f'用户Solo数据请求错误:\n{user_records.error}'
raise RequestError(msg)
self._user_records = user_records
await anti_duplicate_add(
TETRIOHistoricalData,
TETRIOHistoricalData(
user_unique_identifier=(await self.user).unique_identifier,
api_type='User Records',
data=user_records,
update_time=user_records.cache.cached_at,
),
)
return self._user_records

View File

@@ -0,0 +1,20 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
class Cache(BaseModel):
status: str
cached_at: datetime
cached_until: datetime
class SuccessModel(BaseModel):
success: Literal[True]
cache: Cache
class FailedModel(BaseModel):
success: Literal[False]
error: str

View File

@@ -0,0 +1,59 @@
from pydantic import BaseModel, Field
from ..typing import Rank
from .base import FailedModel
from .base import SuccessModel as BaseSuccessModel
class _User(BaseModel):
id: str = Field(..., alias='_id')
username: str
role: str
xp: float
supporter: bool
verified: bool
country: str | None = None
class _League(BaseModel):
gamesplayed: int
gameswon: int
rating: float
rank: Rank
bestrank: Rank
decaying: bool
class ValidLeague(_League):
glicko: float
rd: float
apm: float
pps: float
vs: float
class ValidUser(_User):
league: ValidLeague
class InvalidLeague(_League):
glicko: float | None = None
rd: float | None = None
apm: float | None = None
pps: float | None = None
vs: float | None = None
class InvalidUser(_User):
league: InvalidLeague
class Data(BaseModel):
users: list[ValidUser | InvalidUser]
class TetraLeagueSuccess(BaseSuccessModel):
data: Data
TetraLeague = TetraLeagueSuccess | FailedModel

View File

@@ -0,0 +1,18 @@
from typing import Literal
from typing_extensions import override
from ....schemas import BaseUser
from ...constant import GAME_TYPE
class User(BaseUser):
platform: Literal['IO'] = GAME_TYPE
ID: str
name: str
@property
@override
def unique_identifier(self) -> str:
return self.ID

View File

@@ -0,0 +1,133 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel, Field
from ..typing import Rank
from .base import FailedModel
from .base import SuccessModel as BaseSuccessModel
class Badge(BaseModel):
id: str
label: str
group: str | None = None
ts: datetime | Literal[False] | None = None
class MetaLeague(BaseModel):
decaying: bool
class NeverPlayedLeague(MetaLeague):
gamesplayed: Literal[0]
gameswon: Literal[0]
rating: Literal[-1]
rank: Literal['z']
standing: Literal[-1]
standing_local: Literal[-1]
next_rank: None
prev_rank: None
next_at: Literal[-1]
prev_at: Literal[-1]
percentile: Literal[-1]
percentile_rank: Literal['z']
apm: None = None
pps: None = None
vs: None = None
class NeverRatedLeague(MetaLeague):
gamesplayed: Literal[1, 2, 3, 4, 5, 6, 7, 8, 9]
gameswon: int
rating: Literal[-1]
rank: Literal['z']
standing: Literal[-1]
standing_local: Literal[-1]
next_rank: None
prev_rank: None
next_at: Literal[-1]
prev_at: Literal[-1]
percentile: Literal[-1]
percentile_rank: Literal['z']
apm: float
pps: float
vs: float | None = None
class RatedLeague(MetaLeague):
gamesplayed: int
gameswon: int
rating: float
rank: Rank
bestrank: Rank
standing: int
standing_local: int
next_rank: Rank | None = None
prev_rank: Rank | None = None
next_at: int
prev_at: int
percentile: float
percentile_rank: str
glicko: float
rd: float
apm: float
pps: float
vs: float | None = None
class Discord(BaseModel):
id: str
username: str
class Connections(BaseModel):
discord: Discord | None = None
class Distinguishment(BaseModel):
type: str
class User(BaseModel):
id: str = Field(..., alias='_id')
username: str
role: Literal['anon', 'user', 'bot', 'halfmod', 'mod', 'admin', 'sysop', 'banned']
ts: datetime | None = None
botmaster: str | None = None
badges: list[Badge]
xp: float
gamesplayed: int
gameswon: int
gametime: float
country: str | None = None
badstanding: bool | None = None
supporter: bool | None = None # osk说是必有, 但实际上不是 fkosk
supporter_tier: int
verified: bool
league: NeverPlayedLeague | NeverRatedLeague | RatedLeague
avatar_revision: int | None = None
"""This user's avatar ID. Get their avatar at
https://tetr.io/user-content/avatars/{ USERID }.jpg?rv={ AVATAR_REVISION }"""
banner_revision: int | None = None
"""This user's banner ID. Get their banner at
https://tetr.io/user-content/banners/{ USERID }.jpg?rv={ BANNER_REVISION }
Ignore this field if the user is not a supporter."""
bio: str | None = None
connections: Connections
friend_count: int | None = None
distinguishment: Distinguishment | None = None
class Data(BaseModel):
user: User
class UserInfoSuccess(BaseSuccessModel):
data: Data
UserInfo = UserInfoSuccess | FailedModel

View File

@@ -0,0 +1,122 @@
from datetime import datetime
from pydantic import BaseModel, Field
from .....utils.typing import Number
from .base import FailedModel
from .base import SuccessModel as BaseSuccessModel
class Time(BaseModel):
start: int
zero: bool
locked: bool
prev: int
frameoffset: int | None = None
class Clears(BaseModel):
singles: int
doubles: int
triples: int
quads: int
pentas: int | None = None
realtspins: int
minitspins: int
minitspinsingles: int
tspinsingles: int
minitspindoubles: int
tspindoubles: int
tspintriples: int
tspinquads: int
allclear: int
class Garbage(BaseModel):
sent: int
received: int
attack: int | None = None
cleared: int | None = None
class Finesse(BaseModel):
combo: int
faults: int
perfectpieces: int
class EndContext(BaseModel):
seed: Number
lines: int
level_lines: int
level_lines_needed: int
inputs: int
holds: int | None = None
time: Time
score: int
zenlevel: int | None = None
zenprogress: int | None = None
level: int
combo: int
currentcombopower: int | None = None # WTF
topcombo: int
btb: int
topbtb: int
currentbtbchainpower: int | None = None # WTF * 2
tspins: int
piecesplaced: int
clears: Clears
garbage: Garbage
kills: int
finesse: Finesse
final_time: float = Field(..., alias='finalTime')
gametype: str
class _User(BaseModel):
id: str = Field(..., alias='_id')
username: str
class _Record(BaseModel):
id: str = Field(..., alias='_id')
stream: str
replayid: str
user: _User
ts: datetime
ismulti: bool | None = None
class SoloRecord(_Record):
endcontext: EndContext
class MultiRecord(_Record):
endcontext: list[EndContext]
class SoloModeRecord(BaseModel):
record: SoloRecord
rank: int | None = None
class Records(BaseModel):
sprint: SoloModeRecord = Field(..., alias='40l')
blitz: SoloModeRecord
class Zen(BaseModel):
level: int
score: int
class Data(BaseModel):
records: Records
zen: Zen
class UserRecordsSuccess(BaseSuccessModel):
data: Data
UserRecords = UserRecordsSuccess | FailedModel

View File

@@ -0,0 +1,36 @@
from typing import Literal, NamedTuple, overload
from nonebot.compat import type_validate_json
from ....utils.exception import RequestError
from ....utils.request import splice_url
from ..constant import BASE_URL
from .cache import Cache
from .schemas.base import FailedModel
from .schemas.tetra_league import TetraLeague, TetraLeagueSuccess
class FullExport(NamedTuple):
model: TetraLeagueSuccess
original: bytes
@overload
async def full_export(*, with_original: Literal[False]) -> TetraLeagueSuccess: ...
@overload
async def full_export(*, with_original: Literal[True]) -> FullExport: ...
async def full_export(*, with_original: bool) -> TetraLeagueSuccess | FullExport:
full: TetraLeague = type_validate_json(
TetraLeague, # type: ignore[arg-type]
(data := await Cache.get(splice_url([BASE_URL, 'users/lists/league/all']))),
)
if isinstance(full, FailedModel):
msg = f'排行榜数据请求错误:\n{full.error}'
raise RequestError(msg)
if with_original:
return FullExport(full, data)
return full

View File

@@ -0,0 +1,22 @@
from typing import Literal
Rank = Literal[
'x',
'u',
'ss',
's+',
's',
's-',
'a+',
'a',
'a-',
'b+',
'b',
'b-',
'c+',
'c',
'c-',
'd+',
'd',
'z', # 未定级
]

View File

@@ -0,0 +1,64 @@
from hashlib import md5
from urllib.parse import urlunparse
from nonebot.adapters import Bot, Event
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from nonebot_plugin_userinfo import BotUserInfo, UserInfo # type: ignore[import-untyped]
from ...db import BindStatus, create_or_update_bind, trigger
from ...utils.avatar import get_avatar
from ...utils.host import HostPage, get_self_netloc
from ...utils.platform import get_platform
from ...utils.render import Bind, render
from ...utils.render.schemas.base import Avatar, People
from ...utils.screenshot import screenshot
from . import alc
from .api import Player
from .constant import GAME_TYPE
@alc.assign('bind')
async def _(bot: Bot, event: Event, account: Player, event_session: EventSession, bot_info: UserInfo = BotUserInfo()): # noqa: B008
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='bind',
command_args=[],
):
user = await account.user
async with get_session() as session:
bind_status = await create_or_update_bind(
session=session,
chat_platform=get_platform(bot),
chat_account=event.get_user_id(),
game_platform=GAME_TYPE,
game_account=user.unique_identifier,
)
user_info = await account.get_info()
if bind_status in (BindStatus.SUCCESS, BindStatus.UPDATE):
async with HostPage(
await render(
'binding',
Bind(
platform='TETR.IO',
status='unknown',
user=People(
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
if user_info.data.user.avatar_revision is not None
else Avatar(type='identicon', hash=md5(user_info.data.user.id.encode()).hexdigest()), # noqa: S324
name=user_info.data.user.username.upper(),
),
bot=People(
avatar=await get_avatar(bot_info, 'Data URI', '../../static/logo/logo.svg'),
name=bot_info.user_name,
),
command='io查我',
),
)
) as page_hash:
await UniMessage.image(
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
).finish()

View File

@@ -0,0 +1,34 @@
from re import compile
from typing import Literal
from .api.typing import Rank
GAME_TYPE: Literal['IO'] = 'IO'
BASE_URL = 'https://ch.tetr.io/api/'
RANK_PERCENTILE: dict[Rank, float] = {
'x': 1,
'u': 5,
'ss': 11,
's+': 17,
's': 23,
's-': 30,
'a+': 38,
'a': 46,
'a-': 54,
'b+': 62,
'b': 70,
'b-': 78,
'c+': 84,
'c': 90,
'c-': 95,
'd+': 97.5,
'd': 100,
}
TR_MIN = 0
TR_MAX = 25000
USER_ID = compile(r'^[a-f0-9]{24}$')
USER_NAME = compile(r'^[a-zA-Z0-9_-]{3,16}$')

View File

@@ -0,0 +1,28 @@
from datetime import datetime
from nonebot_plugin_orm import Model
from sqlalchemy import JSON, DateTime, String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from .api.typing import Rank
class IORank(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
rank: Mapped[Rank] = mapped_column(String(2), index=True)
tr_line: Mapped[float]
player_count: Mapped[int]
low_pps: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
low_apm: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
low_vs: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
avg_pps: Mapped[float]
avg_apm: Mapped[float]
avg_vs: Mapped[float]
high_pps: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
high_apm: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
high_vs: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
update_time: Mapped[datetime] = mapped_column(
DateTime,
index=True,
)
file_hash: Mapped[str | None] = mapped_column(String(128), index=True)

View File

@@ -0,0 +1,388 @@
import contextlib
from asyncio import gather
from collections import defaultdict
from datetime import date, datetime, timedelta, timezone
from hashlib import md5
from math import ceil, floor
from typing import ClassVar
from urllib.parse import urlunparse
from zoneinfo import ZoneInfo
from aiofiles import open
from nonebot import get_driver
from nonebot.adapters import Bot, Event
from nonebot.compat import type_validate_json
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_apscheduler import scheduler # type: ignore[import-untyped]
from nonebot_plugin_localstore import get_data_file # type: ignore[import-untyped]
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from sqlalchemy import select
from zstandard import ZstdDecompressor
from ...db import query_bind_info, trigger
from ...utils.host import HostPage, get_self_netloc
from ...utils.platform import get_platform
from ...utils.render import TETRIOInfo, render
from ...utils.render.schemas.base import Avatar
from ...utils.render.schemas.tetrio_info import Data, Radar, Ranking, TetraLeague, TetraLeagueHistory
from ...utils.render.schemas.tetrio_info import User as TemplateUser
from ...utils.screenshot import screenshot
from ...utils.typing import Me, Number
from ..constant import CANT_VERIFY_MESSAGE
from . import alc
from .api import Player, User, UserInfoSuccess
from .api.models import TETRIOHistoricalData
from .api.schemas.tetra_league import TetraLeagueSuccess
from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague
from .api.schemas.user_records import SoloModeRecord, SoloRecord
from .constant import GAME_TYPE, TR_MAX, TR_MIN
from .model import IORank
UTC = timezone.utc
driver = get_driver()
@alc.assign('query')
async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
async with get_session() as session:
bind = await query_bind_info(
session=session,
chat_platform=get_platform(bot),
chat_account=(target.target if isinstance(target, At) else event.get_user_id()),
game_platform=GAME_TYPE,
)
if bind is None:
await matcher.finish('未查询到绑定信息')
message = UniMessage(CANT_VERIFY_MESSAGE)
player = Player(user_id=bind.game_account, trust=True)
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
sprint = user_records.data.records.sprint
blitz = user_records.data.records.blitz
with contextlib.suppress(TypeError):
message += UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record))
await message.finish()
message += make_query_text(user_info, sprint, blitz)
await message.finish()
@alc.assign('query')
async def _(account: Player, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
user, user_info, user_records = await gather(account.user, account.get_info(), account.get_records())
sprint = user_records.data.records.sprint
blitz = user_records.data.records.blitz
with contextlib.suppress(TypeError):
await UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)).finish()
await make_query_text(user_info, sprint, blitz).finish()
def get_value_bounds(values: list[int | float]) -> tuple[int, int]:
value_max = 10 * ceil(max(values) / 10)
value_min = 10 * floor(min(values) / 10)
return value_max, value_min
def get_split(value_max: int, value_min: int) -> tuple[int, int]:
offset = 0
overflow = 0
while True:
if (new_max_value := value_max + offset + overflow) > TR_MAX:
overflow -= 1
continue
if (new_min_value := value_min - offset + overflow) < TR_MIN:
overflow += 1
continue
if ((new_max_value - new_min_value) / 40).is_integer():
split_value = int((value_max + offset - (value_min - offset)) / 4)
break
offset += 1
return split_value, offset + overflow
def get_specified_point(
previous_point: Data,
behind_point: Data,
point_time: datetime,
) -> Data:
"""根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据
Args:
previous_point (Data): 前面的数据点
behind_point (Data): 后面的数据点
point_time (datetime): 要推算的点的位置
Returns:
Data: 要推算的点的数据
"""
# 求两个点的斜率
slope = (behind_point.tr - previous_point.tr) / (
datetime.timestamp(behind_point.record_at) - datetime.timestamp(previous_point.record_at)
)
return Data(
record_at=point_time,
tr=previous_point.tr + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.record_at)),
)
async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[Data]:
today = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
forward = timedelta(days=9)
start_time = (today - forward).astimezone(UTC)
async with get_session() as session:
historical_data = (
await session.scalars(
select(TETRIOHistoricalData)
.where(TETRIOHistoricalData.update_time >= start_time)
.where(TETRIOHistoricalData.user_unique_identifier == user.unique_identifier)
.where(TETRIOHistoricalData.api_type == 'User Info')
)
).all()
if historical_data:
extra = (
await session.scalars(
select(TETRIOHistoricalData)
.where(TETRIOHistoricalData.user_unique_identifier == user.unique_identifier)
.where(TETRIOHistoricalData.api_type == 'User Info')
.order_by(TETRIOHistoricalData.id.desc())
.where(TETRIOHistoricalData.id < min([i.id for i in historical_data]))
.limit(1)
)
).one_or_none()
if extra is not None:
historical_data = list(historical_data)
historical_data.append(extra)
full_export_data = FullExport.get_data(user.unique_identifier)
if not historical_data and not full_export_data:
return [
Data(record_at=today - forward, tr=user_info.data.user.league.rating),
Data(record_at=today.replace(microsecond=1000), tr=user_info.data.user.league.rating),
]
histories = [
Data(
record_at=i.update_time.astimezone(ZoneInfo('Asia/Shanghai')),
tr=i.data.data.user.league.rating,
)
for i in historical_data
if isinstance(i.data, UserInfoSuccess) and isinstance(i.data.data.user.league, RatedLeague)
] + full_export_data
# 按照时间排序
histories = sorted(histories, key=lambda x: x.record_at)
for index, value in enumerate(histories):
# 在历史记录里找有没有今天0点后的数据, 并且至少要有两个数据点
if value.record_at > today and len(histories) >= 2: # noqa: PLR2004
histories = histories[:index] + [
get_specified_point(histories[index - 1], histories[index], today.replace(microsecond=1000))
]
break
else:
histories.append(
get_specified_point(
histories[-1],
Data(record_at=user_info.cache.cached_at, tr=user_info.data.user.league.rating),
today.replace(microsecond=1000),
)
)
if histories[0].record_at < (today - forward):
histories[0] = get_specified_point(
histories[0],
histories[1],
today - forward,
)
else:
histories.insert(0, Data(record_at=today - forward, tr=histories[0].tr))
return histories
async def make_query_image(
user: User, user_info: UserInfoSuccess, sprint: SoloRecord | None, blitz: SoloRecord | None
) -> bytes:
league = user_info.data.user.league
if not isinstance(league, RatedLeague) or league.vs is None:
raise TypeError
user_name = user_info.data.user.username.upper()
histories = await query_historical_data(user, user_info)
value_max, value_min = get_value_bounds([i.tr for i in histories])
split_value, offset = get_split(value_max, value_min)
if sprint is not None:
duration = timedelta(milliseconds=sprint.endcontext.final_time).total_seconds()
sprint_value = f'{duration:.1f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.1f}s' # noqa: PLR2004
else:
sprint_value = 'N/A'
blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A'
async with HostPage(
await render(
'tetrio/info',
TETRIOInfo(
user=TemplateUser(
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
if user_info.data.user.avatar_revision is not None
else Avatar(
type='identicon',
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
),
name=user_name,
bio=user_info.data.user.bio,
),
ranking=Ranking(
rating=round(league.glicko, 2),
rd=round(league.rd, 2),
),
tetra_league=TetraLeague(
rank=league.rank,
tr=round(league.rating, 2),
global_rank=league.standing,
pps=league.pps,
lpm=round(lpm := (league.pps * 24), 2),
apm=league.apm,
apl=round(league.apm / lpm, 2),
vs=league.vs,
adpm=round(adpm := (league.vs * 0.6), 2),
adpl=round(adpm / lpm, 2),
),
tetra_league_history=TetraLeagueHistory(
data=histories,
split_interval=split_value,
min_tr=value_min,
max_tr=value_max,
offset=offset,
),
radar=Radar(
app=(app := (league.apm / (60 * league.pps))),
dsps=(dsps := ((league.vs / 100) - (league.apm / 60))),
dspp=(dspp := (dsps / league.pps)),
ci=150 * dspp - 125 * app + 50 * (league.vs / league.apm) - 25,
ge=2 * ((app * dsps) / league.pps),
),
sprint=sprint_value,
blitz=blitz_value,
),
)
) as page_hash:
return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: SoloModeRecord) -> UniMessage:
league = user_info.data.user.league
user_name = user_info.data.user.username.upper()
message = ''
if isinstance(league, NeverPlayedLeague):
message += f'用户 {user_name} 没有排位统计数据'
else:
if isinstance(league, NeverRatedLeague):
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
else:
if league.rank == 'z':
message += f'用户 {user_name} 暂无段位, {round(league.rating,2)} TR'
else:
message += f'{league.rank.upper()} 段用户 {user_name} {round(league.rating,2)} TR (#{league.standing})'
message += f', 段位分 {round(league.glicko,2)}±{round(league.rd,2)}, 最近十场的数据:'
lpm = league.pps * 24
message += f"\nL'PM: {round(lpm, 2)} ( {league.pps} pps )"
message += f'\nAPM: {league.apm} ( x{round(league.apm/lpm,2)} )'
if league.vs is not None:
adpm = league.vs * 0.6
message += f'\nADPM: {round(adpm,2)} ( x{round(adpm/lpm,2)} ) ( {league.vs}vs )'
if sprint.record is not None:
message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s'
message += f' ( #{sprint.rank} )' if sprint.rank is not None else ''
if blitz.record is not None:
message += f'\nBlitz: {blitz.record.endcontext.score}'
message += f' ( #{blitz.rank} )' if blitz.rank is not None else ''
return UniMessage(message)
class FullExport:
cache: ClassVar[defaultdict[str, set[tuple[datetime, Number]]]] = defaultdict(set)
latest_update: ClassVar[date | None] = None
@classmethod
async def init(cls) -> None:
async with get_session() as session:
full_exports = (await session.scalars(select(IORank).where(IORank.update_time >= cls.start_time()))).all()
await gather(
*[
cls._load(update_time, file_hash)
for file_hash, update_time in {
i.file_hash: i.update_time for i in full_exports if i.file_hash is not None
}.items()
]
)
@classmethod
async def update(cls) -> None:
if cls.latest_update == datetime.now(tz=ZoneInfo('Asia/Shanghai')).date():
return
start_time = cls.start_time()
for i in cls.cache:
cls.cache[i] = {j for j in cls.cache[i] if j[0] >= start_time}
latest_time = max(cls.cache)
async with get_session() as session:
full_exports = (await session.scalars(select(IORank).where(IORank.update_time > latest_time))).all()
await gather(
*[
cls._load(update_time, file_hash)
for file_hash, update_time in {
i.file_hash: i.update_time for i in full_exports if i.file_hash is not None
}.items()
]
)
cls.latest_update = datetime.now(tz=ZoneInfo('Asia/Shanghai')).date()
@classmethod
def get_data(cls, unique_identifier: str) -> list[Data]:
return [Data(record_at=i[0], tr=i[1]) for i in cls.cache[unique_identifier]]
@classmethod
def start_time(cls) -> datetime:
return (
datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
- timedelta(days=9)
).astimezone(UTC)
@classmethod
async def _load(cls, update_time: datetime, file_hash: str) -> None:
try:
users = type_validate_json(TetraLeagueSuccess, await cls.decompress(file_hash)).data.users
except FileNotFoundError:
await cls.clear_invalid(file_hash)
return
update_time = update_time.astimezone(ZoneInfo('Asia/Shanghai'))
for i in users:
cls.cache[i.id].add((update_time, i.league.rating))
@classmethod
async def decompress(cls, file_hash: str) -> bytes:
async with open(get_data_file('nonebot_plugin_tetris_stats', f'{file_hash}.json.zst'), mode='rb') as file:
return ZstdDecompressor().decompress(await file.read())
@classmethod
async def clear_invalid(cls, file_hash: str) -> None:
async with get_session() as session:
full_exports = (await session.scalars(select(IORank).where(IORank.file_hash == file_hash))).all()
for i in full_exports:
i.file_hash = None
await session.commit()
@driver.on_startup
async def _():
await FullExport.init()
scheduler.add_job(FullExport.update, 'interval', hours=1)

View File

@@ -0,0 +1,174 @@
from collections import defaultdict
from collections.abc import Callable
from datetime import datetime, timedelta, timezone
from hashlib import sha512
from math import floor
from statistics import mean
from zoneinfo import ZoneInfo
from aiofiles import open
from nonebot import get_driver
from nonebot.compat import model_dump
from nonebot.matcher import Matcher
from nonebot.utils import run_sync
from nonebot_plugin_apscheduler import scheduler # type: ignore[import-untyped]
from nonebot_plugin_localstore import get_data_file # type: ignore[import-untyped]
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from sqlalchemy import func, select
from zstandard import ZstdCompressor
from ...db import trigger
from ...utils.exception import RequestError
from ...utils.metrics import get_metrics
from ...utils.retry import retry
from . import alc
from .api.schemas.base import FailedModel
from .api.schemas.tetra_league import ValidUser
from .api.schemas.user import User
from .api.tetra_league import full_export
from .api.typing import Rank
from .constant import GAME_TYPE, RANK_PERCENTILE
from .model import IORank
UTC = timezone.utc
driver = get_driver()
@alc.assign('rank')
async def _(matcher: Matcher, rank: Rank, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='rank',
command_args=[],
):
if rank == 'z':
await matcher.finish('暂不支持查询未知段位')
async with get_session() as session:
latest_data = (
await session.scalars(select(IORank).where(IORank.rank == rank).order_by(IORank.id.desc()).limit(1))
).one()
compare_data = (
await session.scalars(
select(IORank)
.where(IORank.rank == rank)
.order_by(
func.abs(
func.julianday(IORank.update_time)
- func.julianday(latest_data.update_time - timedelta(hours=24))
)
)
.limit(1)
)
).one()
message = ''
if (datetime.now(UTC) - latest_data.update_time.replace(tzinfo=UTC)) > timedelta(hours=7):
message += 'Warning: 数据超过7小时未更新, 请联系Bot主人查看后台\n'
message += f'{rank.upper()} 段 分数线 {latest_data.tr_line:.2f} TR, {latest_data.player_count} 名玩家\n'
if compare_data.id != latest_data.id:
message += f'对比 {(latest_data.update_time-compare_data.update_time).total_seconds()/3600:.2f} 小时前趋势: {f"{difference:.2f}" if (difference:=latest_data.tr_line-compare_data.tr_line) > 0 else f"{-difference:.2f}" if difference < 0 else ""}'
else:
message += '暂无对比数据'
avg = get_metrics(pps=latest_data.avg_pps, apm=latest_data.avg_apm, vs=latest_data.avg_vs)
low_pps = get_metrics(pps=latest_data.low_pps[1])
low_vs = get_metrics(vs=latest_data.low_vs[1])
max_pps = get_metrics(pps=latest_data.high_pps[1])
max_vs = get_metrics(vs=latest_data.high_vs[1])
message += (
'\n'
'平均数据:\n'
f"L'PM: {avg.lpm} ( {avg.pps} pps )\n"
f'APM: {avg.apm} ( x{avg.apl} )\n'
f'ADPM: {avg.adpm} ( x{avg.adpl} ) ( {avg.vs}vs )\n'
'\n'
'最低数据:\n'
f"L'PM: {low_pps.lpm} ( {low_pps.pps} pps ) By: {latest_data.low_pps[0]['name'].upper()}\n"
f'APM: {latest_data.low_apm[1]} By: {latest_data.low_apm[0]["name"].upper()}\n'
f'ADPM: {low_vs.adpm} ( {low_vs.vs}vs ) By: {latest_data.low_vs[0]["name"].upper()}\n'
'\n'
'最高数据:\n'
f"L'PM: {max_pps.lpm} ( {max_pps.pps} pps ) By: {latest_data.high_pps[0]['name'].upper()}\n"
f'APM: {latest_data.high_apm[1]} By: {latest_data.high_apm[0]["name"].upper()}\n'
f'ADPM: {max_vs.adpm} ( {max_vs.vs}vs ) By: {latest_data.high_vs[0]["name"].upper()}\n'
'\n'
f'数据更新时间: {latest_data.update_time.replace(tzinfo=UTC).astimezone(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")}'
)
await matcher.finish(message)
@scheduler.scheduled_job('cron', hour='0,6,12,18', minute=0)
@retry(exception_type=RequestError, delay=timedelta(minutes=15))
async def get_tetra_league_data() -> None:
league, original = await full_export(with_original=True)
if isinstance(league, FailedModel):
msg = f'排行榜数据请求错误:\n{league.error}'
raise RequestError(msg)
def pps(user: ValidUser) -> float:
return user.league.pps
def apm(user: ValidUser) -> float:
return user.league.apm
def vs(user: ValidUser) -> float:
return user.league.vs
def _min(users: list[ValidUser], field: Callable[[ValidUser], float]) -> ValidUser:
return min(users, key=field)
def _max(users: list[ValidUser], field: Callable[[ValidUser], float]) -> ValidUser:
return max(users, key=field)
def build_extremes_data(
users: list[ValidUser],
field: Callable[[ValidUser], float],
sort: Callable[[list[ValidUser], Callable[[ValidUser], float]], ValidUser],
) -> tuple[dict[str, str], float]:
user = sort(users, field)
return model_dump(User(ID=user.id, name=user.username)), field(user)
data_hash: str | None = await run_sync((await run_sync(sha512)(original)).hexdigest)()
async with open(get_data_file('nonebot_plugin_tetris_stats', f'{data_hash}.json.zst'), mode='wb') as file:
await file.write(await run_sync(ZstdCompressor(level=12, threads=-1).compress)(original))
users = [i for i in league.data.users if isinstance(i, ValidUser)]
rank_to_users: defaultdict[Rank, list[ValidUser]] = defaultdict(list)
for i in users:
rank_to_users[i.league.rank].append(i)
rank_info: list[IORank] = []
for rank, percentile in RANK_PERCENTILE.items():
offset = floor((percentile / 100) * len(users)) - 1
tr_line = users[offset].league.rating
rank_users = rank_to_users[rank]
rank_info.append(
IORank(
rank=rank,
tr_line=tr_line,
player_count=len(rank_users),
low_pps=(build_extremes_data(rank_users, pps, _min)),
low_apm=(build_extremes_data(rank_users, apm, _min)),
low_vs=(build_extremes_data(rank_users, vs, _min)),
avg_pps=mean({i.league.pps for i in rank_users}),
avg_apm=mean({i.league.apm for i in rank_users}),
avg_vs=mean({i.league.vs for i in rank_users}),
high_pps=(build_extremes_data(rank_users, pps, _max)),
high_apm=(build_extremes_data(rank_users, apm, _max)),
high_vs=(build_extremes_data(rank_users, vs, _max)),
update_time=league.cache.cached_at,
file_hash=data_hash,
)
)
async with get_session() as session:
session.add_all(rank_info)
await session.commit()
@driver.on_startup
async def _() -> None:
async with get_session() as session:
latest_time = await session.scalar(select(IORank.update_time).order_by(IORank.id.desc()).limit(1))
if latest_time is None or datetime.now(tz=UTC) - latest_time.replace(tzinfo=UTC) > timedelta(hours=6):
await get_tetra_league_data()

View File

@@ -0,0 +1,72 @@
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
from nonebot_plugin_alconna import At, on_alconna
from ...utils.exception import MessageFormatError
from ...utils.typing import Me
from .. import add_default_handlers
from ..constant import BIND_COMMAND, QUERY_COMMAND
from .api import Player
from .constant import USER_NAME
def get_player(name: str) -> Player | MessageFormatError:
if USER_NAME.match(name):
return Player(user_name=name, trust=True)
return MessageFormatError('用户名/ID不合法')
alc = on_alconna(
Alconna(
'top',
Option(
BIND_COMMAND[0],
Args(
Arg(
'account',
get_player,
notice='TOP 用户名',
flags=[ArgFlag.HIDDEN],
)
),
alias=BIND_COMMAND[1:],
compact=True,
dest='bind',
help_text='绑定 TOP 账号',
),
Option(
QUERY_COMMAND[0],
Args(
Arg(
'target',
At | Me,
notice='@想要查询的人 | 自己',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
Arg(
'account',
get_player,
notice='TOP 用户名',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
),
alias=QUERY_COMMAND[1:],
compact=True,
dest='query',
help_text='查询 TOP 游戏信息',
),
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
meta=CommandMeta(
description='查询 TetrisOnline波兰服 的信息',
example='top绑定scdhh\ntop查我',
compact=True,
fuzzy_match=True,
),
),
skip_for_unmatch=False,
auto_send_output=True,
aliases={'TOP'},
)
from . import bind, query # noqa: E402, F401
add_default_handlers(alc)

View File

@@ -0,0 +1,3 @@
from .player import Player
__all__ = ['Player']

View File

@@ -0,0 +1,17 @@
from datetime import datetime
from typing import Literal
from nonebot_plugin_orm import Model
from sqlalchemy import DateTime, String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from ....db.models import PydanticType
from .schemas.user_profile import UserProfile
class TOPHistoricalData(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
user_unique_identifier: Mapped[str] = mapped_column(String(24), index=True)
api_type: Mapped[Literal['User Profile']] = mapped_column(String(16), index=True)
data: Mapped[UserProfile] = mapped_column(PydanticType(get_model=[], models={UserProfile}))
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)

View File

@@ -0,0 +1,71 @@
from contextlib import suppress
from datetime import datetime, timezone
from io import StringIO
from urllib.parse import urlencode
from lxml import etree
from pandas import read_html
from ....db import anti_duplicate_add
from ....utils.request import Request, splice_url
from ..constant import BASE_URL, USER_NAME
from .models import TOPHistoricalData
from .schemas.user import User
from .schemas.user_profile import Data, UserProfile
UTC = timezone.utc
class Player:
def __init__(self, *, user_name: str, trust: bool = False) -> None:
self.user_name = user_name
if not trust and not USER_NAME.match(self.user_name):
msg = 'Invalid user name'
raise ValueError(msg)
self.__user: User | None = None
self._user_profile: UserProfile | None = None
@property
async def user(self) -> User:
if self.__user is None:
profile = await self.get_profile()
self.__user = User(user_name=profile.user_name)
return self.__user
async def get_profile(self) -> UserProfile:
"""获取用户信息"""
if self._user_profile is None:
url = splice_url([BASE_URL, 'profile.php', f'?{urlencode({"user":self.user_name})}'])
raw_user_profile = await Request.request(url, is_json=False)
self._user_profile = self._parse_profile(raw_user_profile)
await anti_duplicate_add(
TOPHistoricalData,
TOPHistoricalData(
user_unique_identifier=(await self.user).unique_identifier,
api_type='User Profile',
data=self._user_profile,
update_time=datetime.now(tz=UTC),
),
)
return self._user_profile
def _parse_profile(self, original_user_profile: bytes) -> UserProfile:
html = etree.HTML(original_user_profile)
user_name = html.xpath('//div[@class="mycontent"]/h1/text()')[0].replace("'s profile", '')
today = None
with suppress(ValueError):
today = Data(
lpm=float(str(html.xpath('//div[@class="mycontent"]/text()[3]')[0]).replace('lpm:', '').strip()),
apm=float(str(html.xpath('//div[@class="mycontent"]/text()[4]')[0]).replace('apm:', '').strip()),
)
table = StringIO(
etree.tostring(
html.xpath('//div[@class="mycontent"]/table[@class="mytable"]')[0],
encoding='utf-8',
).decode()
)
dataframe = read_html(table, encoding='utf-8', header=0)[0]
total: list[Data] = []
for _, value in dataframe.iterrows():
total.append(Data(lpm=value['lpm'], apm=value['apm']))
return UserProfile(user_name=user_name, today=today, total=total)

View File

@@ -0,0 +1,17 @@
from typing import Literal
from typing_extensions import override
from ....schemas import BaseUser
from ...constant import GAME_TYPE
class User(BaseUser):
platform: Literal['TOP'] = GAME_TYPE
user_name: str
@property
@override
def unique_identifier(self) -> str:
return self.user_name

View File

@@ -0,0 +1,12 @@
from pydantic import BaseModel
class Data(BaseModel):
lpm: float
apm: float
class UserProfile(BaseModel):
user_name: str
today: Data | None
total: list[Data] | None

View File

@@ -0,0 +1,66 @@
from urllib.parse import urlunparse
from nonebot.adapters import Bot
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from nonebot_plugin_userinfo import BotUserInfo, EventUserInfo, UserInfo # type: ignore[import-untyped]
from ...db import BindStatus, create_or_update_bind, trigger
from ...utils.avatar import get_avatar
from ...utils.host import HostPage, get_self_netloc
from ...utils.platform import get_platform
from ...utils.render import Bind, render
from ...utils.render.schemas.base import People
from ...utils.screenshot import screenshot
from . import alc
from .api import Player
from .constant import GAME_TYPE
@alc.assign('bind')
async def _(
bot: Bot,
account: Player,
event_session: EventSession,
bot_info: UserInfo = BotUserInfo(), # noqa: B008
event_user_info: UserInfo = EventUserInfo(), # noqa: B008
):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='bind',
command_args=[],
):
user = await account.user
async with get_session() as session:
bind_status = await create_or_update_bind(
session=session,
chat_platform=get_platform(bot),
chat_account=event_user_info.user_id,
game_platform=GAME_TYPE,
game_account=user.unique_identifier,
)
if bind_status in (BindStatus.SUCCESS, BindStatus.UPDATE):
async with HostPage(
await render(
'binding',
Bind(
platform=GAME_TYPE,
status='unknown',
user=People(
avatar=await get_avatar(event_user_info, 'Data URI', None),
name=user.user_name,
),
bot=People(
avatar=await get_avatar(bot_info, 'Data URI', '../../static/logo/logo.svg'),
name=bot_info.user_name,
),
command='top查我',
),
)
) as page_hash:
await UniMessage.image(
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
).finish()

View File

@@ -0,0 +1,8 @@
from re import compile
from typing import Literal
GAME_TYPE: Literal['TOP'] = 'TOP'
BASE_URL = 'http://tetrisonline.pl/top/'
USER_NAME = compile(r'^[a-zA-Z0-9_]{1,16}$')

View File

@@ -0,0 +1,73 @@
from nonebot.adapters import Bot, Event
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from ...db import query_bind_info, trigger
from ...utils.metrics import get_metrics
from ...utils.platform import get_platform
from ...utils.typing import Me
from ..constant import CANT_VERIFY_MESSAGE
from . import alc
from .api import Player
from .api.schemas.user_profile import UserProfile
from .constant import GAME_TYPE
@alc.assign('query')
async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
async with get_session() as session:
bind = await query_bind_info(
session=session,
chat_platform=get_platform(bot),
chat_account=(target.target if isinstance(target, At) else event.get_user_id()),
game_platform=GAME_TYPE,
)
if bind is None:
await matcher.finish('未查询到绑定信息')
message = CANT_VERIFY_MESSAGE
await (message + make_query_text(await Player(user_name=bind.game_account, trust=True).get_profile())).finish()
@alc.assign('query')
async def _(account: Player, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
await (make_query_text(await account.get_profile())).finish()
def make_query_text(profile: UserProfile) -> UniMessage:
message = ''
if profile.today is not None:
today = get_metrics(lpm=profile.today.lpm, apm=profile.today.apm)
message += f'用户 {profile.user_name} 24小时内统计数据为: '
message += f"\nL'PM: {today.lpm} ( {today.pps} pps )"
message += f'\nAPM: {today.apm} ( x{today.apl} )'
else:
message += f'用户 {profile.user_name} 暂无24小时内统计数据'
if profile.total is not None:
total_lpm = total_apm = 0.0
for value in profile.total:
total_lpm += value.lpm
total_apm += value.apm
num = len(profile.total)
total = get_metrics(lpm=total_lpm / num, apm=total_apm / num)
message += '\n历史统计数据为: '
message += f"\nL'PM: {total.lpm} ( {total.pps} pps )"
message += f'\nAPM: {total.apm} ( x{total.apl} )'
else:
message += '\n暂无历史统计数据'
return UniMessage(message)

View File

@@ -0,0 +1,79 @@
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
from nonebot_plugin_alconna import At, on_alconna
from ...utils.exception import MessageFormatError
from ...utils.typing import Me
from .. import add_default_handlers
from ..constant import BIND_COMMAND, QUERY_COMMAND
from .api import Player
from .constant import USER_NAME
def get_player(teaid_or_name: str) -> Player | MessageFormatError:
if (
teaid_or_name.startswith(('onebot-', 'qqguild-', 'kook-', 'discord-'))
and teaid_or_name.split('-', maxsplit=1)[1].isdigit()
):
return Player(teaid=teaid_or_name, trust=True)
if USER_NAME.match(teaid_or_name) and not teaid_or_name.isdigit() and 2 <= len(teaid_or_name) <= 18: # noqa: PLR2004
return Player(user_name=teaid_or_name, trust=True)
return MessageFormatError('用户名/ID不合法')
alc = on_alconna(
Alconna(
'茶服',
Option(
BIND_COMMAND[0],
Args(
Arg(
'account',
get_player,
notice='茶服 用户名 / TeaID',
flags=[ArgFlag.HIDDEN],
)
),
alias=BIND_COMMAND[1:],
compact=True,
dest='bind',
help_text='绑定 茶服 账号',
),
Option(
QUERY_COMMAND[0],
Args(
Arg(
'target',
At | Me,
notice='@想要查询的人 | 自己',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
Arg(
'account',
get_player,
notice='茶服 用户名 / TeaID',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
# 如果放在一个 Union Args 里, 验证顺序不能保证, 可能出错
),
alias=QUERY_COMMAND[1:],
compact=True,
dest='query',
help_text='查询 茶服 游戏信息',
),
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
meta=CommandMeta(
description='查询 TetrisOnline茶服 的信息',
example='茶服查我',
compact=True,
fuzzy_match=True,
),
),
skip_for_unmatch=False,
auto_send_output=True,
aliases={'tos', 'TOS'},
)
from . import bind, query # noqa: E402, F401
add_default_handlers(alc)

View File

@@ -0,0 +1,3 @@
from .player import Player
__all__ = ['Player']

View File

@@ -0,0 +1,20 @@
from datetime import datetime
from typing import Literal
from nonebot_plugin_orm import Model
from sqlalchemy import DateTime, String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from ....db.models import PydanticType
from .schemas.user_info import UserInfoSuccess
from .schemas.user_profile import UserProfile
class TOSHistoricalData(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
user_unique_identifier: Mapped[str] = mapped_column(String(24), index=True)
api_type: Mapped[Literal['User Info', 'User Profile']] = mapped_column(String(16), index=True)
data: Mapped[UserInfoSuccess | UserProfile] = mapped_column(
PydanticType(get_model=[], models={UserInfoSuccess, UserProfile})
)
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)

View File

@@ -0,0 +1,128 @@
from datetime import datetime, timezone
from typing import overload
from urllib.parse import urlencode
from httpx import TimeoutException
from nonebot.compat import type_validate_json
from ....db import anti_duplicate_add
from ....utils.exception import RequestError
from ....utils.request import Request, splice_url
from ..constant import BASE_URL, USER_NAME
from .models import TOSHistoricalData
from .schemas.user import User
from .schemas.user_info import UserInfo, UserInfoSuccess
from .schemas.user_profile import UserProfile
UTC = timezone.utc
class Player:
@overload
def __init__(self, *, teaid: str, trust: bool = False): ...
@overload
def __init__(self, *, user_name: str, trust: bool = False): ...
def __init__(self, *, teaid: str | None = None, user_name: str | None = None, trust: bool = False):
self.teaid = teaid
self.user_name = user_name
if not trust:
if self.teaid is not None:
if (
not self.teaid.startswith(('onebot-', 'qqguild-', 'kook-', 'discord-'))
or not self.teaid.split('-', maxsplit=1)[1].isdigit()
):
msg = 'Invalid teaid'
raise ValueError(msg)
elif self.user_name is not None:
if not USER_NAME.match(self.user_name) or self.user_name.isdigit() or 2 > len(self.user_name) > 18: # noqa: PLR2004
msg = 'Invalid user name'
raise ValueError(msg)
else:
msg = 'Invalid user'
raise ValueError(msg)
self.__user: User | None = None
self._user_info: UserInfoSuccess | None = None
self._user_profile: dict[str, UserProfile] = {}
@property
async def user(self) -> User:
if self.__user is None:
user_info = await self.get_info()
self.__user = User(teaid=user_info.data.teaid, name=user_info.data.name)
self.teaid = user_info.data.teaid
self.user_name = user_info.data.name
return self.__user
async def get_info(self) -> UserInfoSuccess:
"""获取用户信息"""
if self._user_info is None:
if self.teaid is not None:
url = [
splice_url(
[
i,
'getTeaIdInfo',
f'?{urlencode({"teaId":self.teaid})}',
]
)
for i in BASE_URL
]
else:
url = [
splice_url(
[
i,
'getUsernameInfo',
f'?{urlencode({"username":self.user_name})}',
]
)
for i in BASE_URL
]
raw_user_info = await Request.failover_request(url, failover_code=[502], failover_exc=(TimeoutException,))
user_info: UserInfo = type_validate_json(UserInfo, raw_user_info) # type: ignore[arg-type]
if not isinstance(user_info, UserInfoSuccess):
msg = f'用户信息请求错误:\n{user_info.error}'
raise RequestError(msg)
self._user_info = user_info
await anti_duplicate_add(
TOSHistoricalData,
TOSHistoricalData(
user_unique_identifier=(await self.user).unique_identifier,
api_type='User Info',
data=user_info,
update_time=datetime.now(UTC),
),
)
return self._user_info
async def get_profile(self, other_parameter: dict[str, str | bytes] | None = None) -> UserProfile:
"""获取用户数据"""
if other_parameter is None:
other_parameter = {}
params = urlencode(dict(sorted(other_parameter.items())))
if self._user_profile.get(params) is None:
raw_user_profile = await Request.failover_request(
[
splice_url(
[
i,
'getProfile',
f'?{urlencode({"id":self.teaid or self.user_name,**other_parameter})}',
]
)
for i in BASE_URL
],
failover_code=[502],
failover_exc=(TimeoutException,),
)
self._user_profile[params] = type_validate_json(UserProfile, raw_user_profile)
await anti_duplicate_add(
TOSHistoricalData,
TOSHistoricalData(
user_unique_identifier=(await self.user).unique_identifier,
api_type='User Profile',
data=self._user_profile[params],
update_time=datetime.now(UTC),
),
)
return self._user_profile[params]

View File

@@ -0,0 +1,18 @@
from typing import Literal
from typing_extensions import override
from ....schemas import BaseUser
from ...constant import GAME_TYPE
class User(BaseUser):
platform: Literal['TOS'] = GAME_TYPE
teaid: str
name: str
@property
@override
def unique_identifier(self) -> str:
return self.teaid

View File

@@ -0,0 +1,89 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel, Field
class PeriodMatch(BaseModel):
name: str
teaid: str = Field(..., alias='teaId')
rating: str
rd: str
start_time: datetime = Field(..., alias='startTime')
end_time: datetime = Field(..., alias='endTime')
win: str
lose: str
score: str
class UserDataTotalItem(BaseModel):
time_map: str = Field(..., alias='timeMap')
pieces_map: str = Field(..., alias='piecesMap')
clear_lines_map: str = Field(..., alias='clearLinesMap')
attacks_map: str = Field(..., alias='attacksMap')
dig_map: str = Field(..., alias='digMap')
send_map: str = Field(..., alias='sendMap')
rise_map: str = Field(..., alias='riseMap')
offset_map: str = Field(..., alias='offsetMap')
receive_map: str = Field(..., alias='receiveMap')
games_map: str = Field(..., alias='gamesMap')
tetris_map: str = Field(..., alias='tetrisMap')
combo_map: str = Field(..., alias='comboMap')
tspin_map: str = Field(..., alias='tspinMap')
b2b_map: str = Field(..., alias='b2bMap')
perfect_clear_map: str = Field(..., alias='perfectClearMap')
time_no_map: str = Field(..., alias='timeNoMap')
pieces_no_map: str = Field(..., alias='piecesNoMap')
clear_lines_no_map: str = Field(..., alias='clearLinesNoMap')
attacks_no_map: str = Field(..., alias='attacksNoMap')
dig_no_map: str = Field(..., alias='digNoMap')
send_no_map: str = Field(..., alias='sendNoMap')
rise_no_map: str = Field(..., alias='riseNoMap')
offset_no_map: str = Field(..., alias='offsetNoMap')
receive_no_map: str = Field(..., alias='receiveNoMap')
games_no_map: str = Field(..., alias='gamesNoMap')
tetris_no_map: str = Field(..., alias='tetrisNoMap')
combo_no_map: str = Field(..., alias='comboNoMap')
tspin_no_map: str = Field(..., alias='tspinNoMap')
b2b_no_map: str = Field(..., alias='b2bNoMap')
perfect_clear_no_map: str = Field(..., alias='perfectClearNoMap')
class Data(BaseModel):
teaid: str = Field(..., alias='teaId')
name: str
total_exp: str = Field(..., alias='totalExp')
ranking: str
ranked_games: str = Field(..., alias='rankedGames')
rating_now: str = Field(..., alias='ratingNow')
rd_now: str = Field(..., alias='rdNow')
vol_now: str = Field(..., alias='volNow')
rating_last: str = Field(..., alias='ratingLast')
rd_last: str = Field(..., alias='rdLast')
vol_last: str = Field(..., alias='volLast')
period_matches: list[PeriodMatch] = Field(..., alias='periodMatches')
user_data_total: list[UserDataTotalItem] = Field(..., alias='userDataTotal')
ranking_items: str = Field(..., alias='rankingItems')
ranking_game_items: str = Field(..., alias='rankingGameItems')
training_level: str = Field(..., alias='trainingLevel')
training_wins: str = Field(..., alias='trainingWins')
pb_sprint: str = Field(..., alias='PBSprint')
pb_marathon: str = Field(..., alias='PBMarathon')
pb_challenge: str = Field(..., alias='PBChallenge')
register_date: datetime = Field(..., alias='registerDate')
last_login_date: datetime = Field(..., alias='lastLoginDate')
class UserInfoSuccess(BaseModel):
code: int
success: Literal[True]
data: Data
class FailedModel(BaseModel):
code: int
success: Literal[False]
error: str
UserInfo = UserInfoSuccess | FailedModel

View File

@@ -0,0 +1,34 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
class Data(BaseModel):
idmultiplayergameresult: int
iduser: str
teaid: str
time: int
clear_lines: int
attack: int
send: int
offset: int
receive: int
rise: int
dig: int
pieces: int
max_combo: int
pc_count: int
place: int
num_players: int
fumen_code: Literal['0', '1'] # wtf
rule_set: str
garbage: str
idmultiplayergame: int
datetime: datetime
class UserProfile(BaseModel):
code: int
success: bool
data: list[Data]

View File

@@ -0,0 +1,66 @@
from urllib.parse import urlunparse
from nonebot.adapters import Bot
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from nonebot_plugin_userinfo import BotUserInfo, EventUserInfo, UserInfo # type: ignore[import-untyped]
from ...db import BindStatus, create_or_update_bind, trigger
from ...utils.avatar import get_avatar
from ...utils.host import HostPage, get_self_netloc
from ...utils.platform import get_platform
from ...utils.render import Bind, render
from ...utils.render.schemas.base import People
from ...utils.screenshot import screenshot
from . import alc
from .api import Player
from .constant import GAME_TYPE
@alc.assign('bind')
async def _(
bot: Bot,
account: Player,
event_session: EventSession,
bot_info: UserInfo = BotUserInfo(), # noqa: B008
event_user_info: UserInfo = EventUserInfo(), # noqa: B008
):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='bind',
command_args=[],
):
user = await account.user
async with get_session() as session:
bind_status = await create_or_update_bind(
session=session,
chat_platform=get_platform(bot),
chat_account=event_user_info.user_id,
game_platform=GAME_TYPE,
game_account=user.unique_identifier,
)
user_info = await account.get_info()
if bind_status in (BindStatus.SUCCESS, BindStatus.UPDATE):
async with HostPage(
await render(
'binding',
Bind(
platform=GAME_TYPE,
status='unknown',
user=People(
avatar=await get_avatar(event_user_info, 'Data URI', None), name=user_info.data.name
),
bot=People(
avatar=await get_avatar(bot_info, 'Data URI', '../../static/logo/logo.svg'),
name=bot_info.user_remark or bot_info.user_displayname or bot_info.user_name,
),
command='茶服查我',
),
)
) as page_hash:
await UniMessage.image(
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
).finish()

View File

@@ -0,0 +1,16 @@
from re import compile
from typing import Literal
GAME_TYPE: Literal['TOS'] = 'TOS'
BASE_URL = {
'https://teatube.cn:8888/',
'http://cafuuchino1.studio26f.org:19970',
'http://cafuuchino2.studio26f.org:19970',
'http://cafuuchino3.studio26f.org:19970',
'http://cafuuchino4.studio26f.org:19970',
}
USER_NAME = compile(
r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$'
)

View File

@@ -0,0 +1,170 @@
from asyncio import gather
from dataclasses import dataclass
from typing import Literal
from nonebot.adapters import Bot, Event
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from ...db import query_bind_info, trigger
from ...utils.metrics import TetrisMetricsProWithLPMADPM, get_metrics
from ...utils.platform import get_platform
from ...utils.typing import Me
from ..constant import CANT_VERIFY_MESSAGE
from . import alc
from .api import Player
from .constant import GAME_TYPE
def add_special_handlers(
teaid_prefix: Literal['onebot-', 'kook-', 'discord-', 'qqguild-'], match_event: type[Event]
) -> None:
@alc.assign('query')
async def _(event: Event, target: At | Me, event_session: EventSession):
if isinstance(event, match_event):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
await (
await make_query_text(
Player(
teaid=f'{teaid_prefix}{target.target}'
if isinstance(target, At)
else f'{teaid_prefix}{event.get_user_id()}',
trust=True,
)
)
).finish()
try:
from nonebot.adapters.onebot.v11 import MessageEvent as OB11MessageEvent
add_special_handlers('onebot-', OB11MessageEvent)
except ImportError:
pass
try:
from nonebot.adapters.qq.event import GuildMessageEvent as QQGuildMessageEvent
from nonebot.adapters.qq.event import QQMessageEvent
add_special_handlers('qqguild-', QQGuildMessageEvent)
add_special_handlers('onebot-', QQMessageEvent)
except ImportError:
pass
try:
from nonebot.adapters.kaiheila.event import MessageEvent as KookMessageEvent
add_special_handlers('kook-', KookMessageEvent)
except ImportError:
pass
try:
from nonebot.adapters.discord import MessageEvent as DiscordMessageEvent
add_special_handlers('discord-', DiscordMessageEvent)
except ImportError:
pass
@alc.assign('query')
async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
async with get_session() as session:
bind = await query_bind_info(
session=session,
chat_platform=get_platform(bot),
chat_account=(target.target if isinstance(target, At) else event.get_user_id()),
game_platform=GAME_TYPE,
)
if bind is None:
await matcher.finish('未查询到绑定信息')
message = CANT_VERIFY_MESSAGE
await (message + await make_query_text(Player(teaid=bind.game_account, trust=True))).finish()
@alc.assign('query')
async def _(account: Player, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
await (await make_query_text(account)).finish()
@dataclass
class GameData:
game_num: int
metrics: TetrisMetricsProWithLPMADPM
async def get_game_data(player: Player, query_num: int = 50) -> GameData | None:
"""获取游戏数据"""
user_profile = await player.get_profile()
if user_profile.data == []:
return None
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = 0.0
num = 0
for i in user_profile.data:
# 排除单人局和时间为0的游戏
# 茶: 不计算没挖掘的局, 即使apm和lpm也如此
if i.num_players == 1 or i.time == 0 or i.dig is None:
continue
# 加权计算
time = i.time / 1000
lpm = 24 * (i.pieces / time)
apm = (i.attack / time) * 60
adpm = ((i.attack + i.dig) / time) * 60
weighted_total_lpm += lpm * time
weighted_total_apm += apm * time
weighted_total_adpm += adpm * time
total_time += time
num += 1
if num >= query_num:
break
if num == 0:
return None
# TODO: 如果有效局数小于 {查询数} , 并且没有无dig信息的局, 且 user_profile.data 内有{请求数}个局, 则继续往前获取信息
metrics = get_metrics(
lpm=weighted_total_lpm / total_time, apm=weighted_total_apm / total_time, adpm=weighted_total_adpm / total_time
)
lpm = weighted_total_lpm / total_time
apm = weighted_total_apm / total_time
adpm = weighted_total_adpm / total_time
return GameData(game_num=num, metrics=metrics)
async def make_query_text(player: Player) -> UniMessage:
user_info, game_data = await gather(player.get_info(), get_game_data(player))
user_data = user_info.data
message = f'用户 {user_data.name} ({user_data.teaid}) '
if user_data.ranked_games == '0':
message += '暂无段位统计数据'
else:
message += f', 段位分 {round(float(user_data.rating_now),2)}±{round(float(user_data.rd_now),2)} ({round(float(user_data.vol_now),2)}) '
if game_data is None:
message += ', 暂无游戏数据'
else:
message += f', 最近 {game_data.game_num} 局数据'
message += f"\nL'PM: {game_data.metrics.lpm} ( {game_data.metrics.pps} pps )"
message += f'\nAPM: {game_data.metrics.apm} ( x{game_data.metrics.apl} )'
message += f'\nADPM: {game_data.metrics.adpm} ( x{game_data.metrics.adpl} ) ( {game_data.metrics.vs}vs )'
message += f'\n40L: {float(user_data.pb_sprint)/1000:.2f}s' if user_data.pb_sprint != '2147483647' else ''
message += f'\nMarathon: {user_data.pb_marathon}' if user_data.pb_marathon != '0' else ''
message += f'\nChallenge: {user_data.pb_challenge}' if user_data.pb_challenge != '0' else ''
return UniMessage(message)