Compare commits

...

26 Commits

Author SHA1 Message Date
a1ad86d0c7 🔖 1.2.15 2024-06-11 02:02:25 +08:00
e6260ce170 添加 TETR.IO rank 的快捷指令 2024-06-11 02:01:23 +08:00
b0e53bc8c8 🔖 1.2.14 2024-06-10 12:03:34 +08:00
2267bc8f14 🐛 修复快捷方式 2024-06-10 12:03:04 +08:00
607a0927bc TETR.IO 默认模板可配置 2024-06-10 11:57:01 +08:00
7b3ca9eb2a 🚚 TETRIOUserConfig 写错地方了 2024-06-10 11:17:49 +08:00
37c12e439c 🔖 1.2.13 2024-06-10 10:58:19 +08:00
504579710e TETR.IO 适配 v2模板 2024-06-10 10:56:56 +08:00
ce94aee0f4 🔖 1.2.12 2024-06-10 02:37:29 +08:00
b9c58ae125 修改插件元数据 2024-06-10 02:37:12 +08:00
92159e93b8 直接使用 fstring 生成链接 2024-06-10 02:35:07 +08:00
f9b11895e2 使用缓存 2024-06-10 02:34:30 +08:00
f7c3d493ea TETR.IO 适配 v2模板 2024-06-10 02:13:03 +08:00
4954ab3d60 TETR.IO 适配 v2模板 2024-06-10 00:48:13 +08:00
bcca869e72 重置命令为shell style,并使用快捷方式保留之前的行为 2024-06-10 00:47:17 +08:00
a4247abdad 添加 TETR.IO 部分资源的缓存 2024-06-10 00:39:36 +08:00
2c1d43601a 🙈 更新.gitignore 2024-06-10 00:31:30 +08:00
c929c463ec 添加 img_to_png 方法 2024-06-10 00:30:54 +08:00
314e1dede3 🔥 去除截图等待状态时的超时 2024-06-10 00:26:26 +08:00
d5b0ef34c5 更新 v2 模板模型 2024-06-10 00:25:21 +08:00
3d9ef841b1 添加两个新异常类型 2024-06-10 00:09:41 +08:00
b98871f170 🔥 删除 HandleNotFinishedError 2024-06-10 00:07:03 +08:00
38ab872dd8 🗃️ 添加 TETRIOUserConfig 2024-06-09 04:24:48 +08:00
f44c0baa2e 🚨 添加一些 type: ignore 2024-06-09 03:59:08 +08:00
9b8d17577e 🔥 删除 get_platform 函数 2024-06-08 12:13:20 +08:00
f301bee2b0 🔥 不小心把测试用注释传上去了) 2024-06-08 12:12:28 +08:00
27 changed files with 502 additions and 253 deletions

1
.gitignore vendored
View File

@@ -19,3 +19,4 @@ package-lock.json
bot.py
TODO
*.fish
extracted_skin_mino_*

View File

@@ -19,7 +19,7 @@ from .config.config import migrations # noqa: E402
__plugin_meta__ = PluginMetadata(
name='Tetris Stats',
description='一个用于查询 Tetris 相关游戏玩家数据的插件',
usage='发送 {游戏名} --help 查询使用方法',
usage='发送 tstats --help 查询使用方法',
type='application',
homepage='https://github.com/A-minos/nonebot-plugin-tetris-stats',
extra={

View File

@@ -46,7 +46,9 @@ def upgrade(name: str = '') -> None: # noqa: C901
TimeRemainingColumn,
)
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseProcessedData # type: ignore[attr-defined]
from nonebot_plugin_tetris_stats.game_data_processor.schemas import ( # type: ignore[import-untyped]
BaseProcessedData,
)
Base = automap_base() # noqa: N806
Base.prepare(autoload_with=op.get_bind())

View File

@@ -0,0 +1,44 @@
"""Add TETRIO user configuration
迁移 ID: a1195e989cc6
父迁移: b15844837693
创建时间: 2024-06-09 04:20:07.819194
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = 'a1195e989cc6'
down_revision: str | Sequence[str] | None = 'b15844837693'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade(name: str = '') -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
'nonebot_plugin_tetris_stats_tetriouserconfig',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('query_template', sa.String(length=2), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetriouserconfig')),
info={'bind_key': 'nonebot_plugin_tetris_stats'},
)
# ### end Alembic commands ###
def downgrade(name: str = '') -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('nonebot_plugin_tetris_stats_tetriouserconfig')
# ### end Alembic commands ###

View File

@@ -46,7 +46,7 @@ def upgrade(name: str = '') -> None:
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseUser
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseUser # type: ignore[import-untyped]
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
batch_op.add_column(sa.Column('user_unique_identifier', sa.String(length=32), nullable=True))

View File

@@ -1,40 +1,53 @@
from typing import Any
from collections.abc import Callable
from nonebot.adapters import Bot
from nonebot.exception import FinishedException
from nonebot.matcher import Matcher
from nonebot.message import run_postprocessor
from nonebot_plugin_alconna import AlcMatches, AlconnaMatcher, At
from nonebot.typing import T_Handler
from nonebot_plugin_alconna import AlcMatches, Alconna, At, CommandMeta, on_alconna
from .. import ns
from ..utils.exception import MessageFormatError, NeedCatchError
alc = on_alconna(
Alconna(
['tetris-stats', 'tstats'],
namespace=ns,
meta=CommandMeta(
description='俄罗斯方块相关游戏数据查询',
fuzzy_match=True,
),
),
skip_for_unmatch=False,
auto_send_output=True,
use_origin=True,
)
def add_default_handlers(matcher: type[AlconnaMatcher]) -> None:
@matcher.assign('query')
def add_block_handlers(handler: Callable[[T_Handler], T_Handler]) -> None:
@handler
async def _(bot: Bot, matcher: Matcher, target: At):
if isinstance(target, At) and target.target == bot.self_id:
await matcher.finish('不能查询bot的信息')
@matcher.handle()
async def _(matcher: Matcher, account: MessageFormatError):
await matcher.finish(str(account))
@matcher.handle()
async def _(matcher: Matcher, matches: AlcMatches):
if matches.head_matched and matches.options != {} or matches.main_args == {}:
await matcher.finish(
(f'{matches.error_info!r}\n' if matches.error_info is not None else '')
+ f'输入"{matches.header_result} --help"查看帮助'
)
@matcher.handle()
def _(other: Any): # noqa: ANN401, ARG001
raise FinishedException
from . import tetrio, top, tos # noqa: F401, E402
@alc.handle()
async def _(matcher: Matcher, account: MessageFormatError):
await matcher.finish(str(account))
@alc.handle()
async def _(matcher: Matcher, matches: AlcMatches):
if matches.head_matched and matches.options != {} or matches.main_args == {}:
await matcher.finish(
(f'{matches.error_info!r}\n' if matches.error_info is not None else '')
+ f'输入"{matches.header_result} --help"查看帮助'
)
@run_postprocessor
async def _(matcher: Matcher, exception: NeedCatchError):
await matcher.send(str(exception))

View File

@@ -1,14 +1,13 @@
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
from nonebot_plugin_alconna import At, on_alconna
from arclet.alconna import Arg, ArgFlag, Args, Option, Subcommand
from nonebot_plugin_alconna import At
from ... import ns
from ...utils.exception import MessageFormatError
from ...utils.typing import Me
from .. import add_default_handlers
from ..constant import BIND_COMMAND, QUERY_COMMAND
from .. import add_block_handlers, alc
from .api import Player
from .api.typing import Rank
from .api.typing import ValidRank
from .constant import USER_ID, USER_NAME
from .typing import Template
def get_player(user_id_or_name: str) -> Player | MessageFormatError:
@@ -19,69 +18,72 @@ def get_player(user_id_or_name: str) -> Player | MessageFormatError:
return MessageFormatError('用户名/ID不合法')
alc = on_alconna(
Alconna(
'io',
Option(
BIND_COMMAND[0],
alc.command.add(
Subcommand(
'TETR.IO',
Subcommand(
'bind',
Args(
Arg(
'account',
get_player,
notice='IO 用户名 / ID',
notice='TETR.IO 用户名 / ID',
flags=[ArgFlag.HIDDEN],
)
),
alias=BIND_COMMAND[1:],
compact=True,
dest='bind',
help_text='绑定 IO 账号',
help_text='绑定 TETR.IO 账号',
),
Option(
QUERY_COMMAND[0],
Subcommand(
'query',
Args(
Arg(
'target',
At | Me,
notice='@想要查询的人 | 自己',
notice='@想要查询的人 / 自己',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
Arg(
'account',
get_player,
notice='IO 用户名 / ID',
notice='TETR.IO 用户名 / ID',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
),
alias=QUERY_COMMAND[1:],
compact=True,
dest='query',
help_text='查询 IO 游戏信息',
Option(
'--template',
Arg('template', Template),
alias=['-T'],
help_text='要使用的查询模板',
),
help_text='查询 TETR.IO 游戏信息',
),
Option(
Subcommand(
'rank',
Args(Arg('rank', Rank, notice='IO 段位')),
alias={'Rank', 'RANK', '段位'},
compact=True,
dest='rank',
help_text='查询 IO 段位信息',
Args(Arg('rank', ValidRank, notice='TETR.IO 段位')),
help_text='查询 TETR.IO 段位信息',
),
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
meta=CommandMeta(
description='查询 TETR.IO 的信息',
example='io绑定scdhh\nio查我\niorankx',
compact=True,
fuzzy_match=True,
Subcommand(
'config',
Option(
'--default-template',
Arg('template', Template),
alias=['-DT', 'DefaultTemplate'],
),
),
namespace=ns,
),
skip_for_unmatch=False,
auto_send_output=True,
aliases={'IO'},
dest='TETRIO',
help_text='TETR.IO 游戏相关指令',
)
)
alc.shortcut('fkosk', {'command': 'io查', 'args': [''], 'fuzzy': False, 'humanized': 'An Easter egg!'})
alc.shortcut('(?i:io)(?i:绑定|绑|bind)', {'command': 'tstats TETR.IO bind', 'humanized': 'io绑定'})
alc.shortcut('(?i:io)(?i:查询|查|query|stats)', {'command': 'tstats TETR.IO query', 'humanized': 'io查'})
alc.shortcut('(?i:io)(?i:段位|段|rank)', {'command': 'tstats TETR.IO rank', 'humanized': 'iorank'})
alc.shortcut('(?i:io)(?i:配置|配|config)', {'command': 'tstats TETR.IO config', 'humanized': 'io配置'})
from . import bind, query, rank # noqa: F401, E402
alc.shortcut(
'fkosk', {'command': 'tstats TETR.IO query', 'args': [''], 'fuzzy': False, 'humanized': 'An Easter egg!'}
)
add_default_handlers(alc)
add_block_handlers(alc.assign('TETRIO.query'))
from . import bind, config, query, rank # noqa: F401, E402

View File

@@ -1,6 +1,6 @@
from typing import Literal
Rank = Literal[
ValidRank = Literal[
'x',
'u',
'ss',
@@ -18,5 +18,6 @@ Rank = Literal[
'c-',
'd+',
'd',
'z', # 未定级
]
Rank = ValidRank | Literal['z'] # 未定级

View File

@@ -1,5 +1,6 @@
from asyncio import gather
from hashlib import md5
from urllib.parse import urlunparse
from urllib.parse import urlencode
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
@@ -9,8 +10,8 @@ from nonebot_plugin_user import User # type: ignore[import-untyped]
from nonebot_plugin_userinfo import BotUserInfo, UserInfo # type: ignore[import-untyped]
from ...db import BindStatus, create_or_update_bind, trigger
from ...utils.avatar import get_avatar
from ...utils.host import HostPage, get_self_netloc
from ...utils.image import get_avatar
from ...utils.render import Bind, render
from ...utils.render.schemas.base import Avatar, People
from ...utils.screenshot import screenshot
@@ -19,7 +20,7 @@ from .api import Player
from .constant import GAME_TYPE
@alc.assign('bind')
@alc.assign('TETRIO.bind')
async def _(nb_user: User, account: Player, event_session: EventSession, bot_info: UserInfo = BotUserInfo()): # noqa: B008
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
@@ -27,7 +28,7 @@ async def _(nb_user: User, account: Player, event_session: EventSession, bot_inf
command_type='bind',
command_args=[],
):
user = await account.user
user, user_info = await gather(account.user, account.get_info())
async with get_session() as session:
bind_status = await create_or_update_bind(
session=session,
@@ -35,8 +36,8 @@ async def _(nb_user: User, account: Player, event_session: EventSession, bot_inf
game_platform=GAME_TYPE,
game_account=user.unique_identifier,
)
user_info = await account.get_info()
if bind_status in (BindStatus.SUCCESS, BindStatus.UPDATE):
netloc = get_self_netloc()
async with HostPage(
await render(
'v1/binding',
@@ -44,7 +45,7 @@ async def _(nb_user: User, account: Player, event_session: EventSession, bot_inf
platform='TETR.IO',
status='unknown',
user=People(
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
if user_info.data.user.avatar_revision is not None
else Avatar(type='identicon', hash=md5(user_info.data.user.id.encode()).hexdigest()), # noqa: S324
name=user_info.data.user.username.upper(),
@@ -57,6 +58,4 @@ async def _(nb_user: User, account: Player, event_session: EventSession, bot_inf
),
)
) as page_hash:
await UniMessage.image(
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
).finish()
await UniMessage.image(raw=await screenshot(f'http://{netloc}/host/{page_hash}.html')).finish()

View File

@@ -0,0 +1,20 @@
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import async_scoped_session
from nonebot_plugin_user import User # type: ignore[import-untyped]
from sqlalchemy import select
from . import alc
from .models import TETRIOUserConfig
from .typing import Template
@alc.assign('TETRIO.config')
async def _(user: User, session: async_scoped_session, template: Template):
config = (await session.scalars(select(TETRIOUserConfig).where(TETRIOUserConfig.id == user.id))).one_or_none()
if config is None:
config = TETRIOUserConfig(id=user.id, query_template=template)
session.add(config)
else:
config.query_template = template
await session.commit()
await UniMessage('配置成功').finish()

View File

@@ -5,6 +5,7 @@ from sqlalchemy import JSON, DateTime, String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from .api.typing import Rank
from .typing import Template
class IORank(MappedAsDataclass, Model):
@@ -26,3 +27,8 @@ class IORank(MappedAsDataclass, Model):
index=True,
)
file_hash: Mapped[str | None] = mapped_column(String(128), index=True)
class TETRIOUserConfig(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(primary_key=True)
query_template: Mapped[Template] = mapped_column(String(2))

View File

@@ -1,17 +1,16 @@
from asyncio import gather
from collections import defaultdict
from contextlib import suppress
from datetime import date, datetime, timedelta, timezone
from hashlib import md5
from math import ceil, floor
from typing import ClassVar
from urllib.parse import urlunparse
from typing import ClassVar, TypeVar, overload
from urllib.parse import urlencode
from zoneinfo import ZoneInfo
from aiofiles import open
from nonebot import get_driver
from nonebot.adapters import Event
from nonebot.compat import type_validate_json
from nonebot.compat import model_dump, type_validate_json
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import At
from nonebot_plugin_alconna.uniseg import UniMessage
@@ -20,16 +19,24 @@ from nonebot_plugin_localstore import get_data_file # type: ignore[import-untyp
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from nonebot_plugin_user import User as NBUser # type: ignore[import-untyped]
from nonebot_plugin_user import get_user # type: ignore[import-untyped]
from sqlalchemy import select
from zstandard import ZstdDecompressor
from ...db import query_bind_info, trigger
from ...utils.exception import FallbackError
from ...utils.host import HostPage, get_self_netloc
from ...utils.metrics import TetrisMetricsProWithPPSVS, get_metrics
from ...utils.render import render
from ...utils.render.schemas.base import Avatar, Ranking
from ...utils.render.schemas.tetrio_info import Data, Info, Radar, TetraLeague, TetraLeagueHistory
from ...utils.render.schemas.tetrio_info import User as TemplateUser
from ...utils.render.schemas.tetrio_info import Data, Radar, TetraLeague, TetraLeagueHistory
from ...utils.render.schemas.tetrio_info import Info as V1TemplateInfo
from ...utils.render.schemas.tetrio_info import User as V1TemplateUser
from ...utils.render.schemas.tetrio_info_v2 import Badge, Blitz, Sprint, Statistic, TetraLeagueStatistic, Zen
from ...utils.render.schemas.tetrio_info_v2 import Info as V2TemplateInfo
from ...utils.render.schemas.tetrio_info_v2 import TetraLeague as V2TemplateTetraLeague
from ...utils.render.schemas.tetrio_info_v2 import User as V2TemplateUser
from ...utils.screenshot import screenshot
from ...utils.typing import Me, Number
from ..constant import CANT_VERIFY_MESSAGE
@@ -38,17 +45,25 @@ from .api import Player, User, UserInfoSuccess
from .api.models import TETRIOHistoricalData
from .api.schemas.tetra_league import TetraLeagueSuccess
from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague
from .api.schemas.user_records import SoloModeRecord, SoloRecord
from .api.schemas.user_records import SoloModeRecord, UserRecordsSuccess
from .constant import GAME_TYPE, TR_MAX, TR_MIN
from .model import IORank
from .models import IORank, TETRIOUserConfig
from .typing import Template
UTC = timezone.utc
driver = get_driver()
@alc.assign('query')
async def _(event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
@alc.assign('TETRIO.query')
async def _( # noqa: PLR0913
user: NBUser,
event: Event,
matcher: Matcher,
target: At | Me,
event_session: EventSession,
template: Template | None = None,
):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
@@ -63,34 +78,31 @@ async def _(event: Event, matcher: Matcher, target: At | Me, event_session: Even
),
game_platform=GAME_TYPE,
)
if template is None:
template = await session.scalar(
select(TETRIOUserConfig.query_template).where(TETRIOUserConfig.id == user.id)
)
if bind is None:
await matcher.finish('未查询到绑定信息')
message = UniMessage(CANT_VERIFY_MESSAGE)
player = Player(user_id=bind.game_account, trust=True)
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
sprint = user_records.data.records.sprint
blitz = user_records.data.records.blitz
with suppress(TypeError):
message.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record))
await message.finish()
message += make_query_text(user_info, sprint, blitz)
await message.finish()
await (message + (await make_query_result(player, template or 'v1'))).finish()
@alc.assign('query')
async def _(account: Player, event_session: EventSession):
@alc.assign('TETRIO.query')
async def _(user: NBUser, account: Player, event_session: EventSession, template: Template | None = None):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='query',
command_args=[],
):
user, user_info, user_records = await gather(account.user, account.get_info(), account.get_records())
sprint = user_records.data.records.sprint
blitz = user_records.data.records.blitz
with suppress(TypeError):
await UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)).finish()
await make_query_text(user_info, sprint, blitz).finish()
async with get_session() as session:
if template is None:
template = await session.scalar(
select(TETRIOUserConfig.query_template).where(TETRIOUserConfig.id == user.id)
)
await (await make_query_result(account, template or 'v1')).finish()
def get_value_bounds(values: list[int | float]) -> tuple[int, int]:
@@ -212,13 +224,40 @@ async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[
return histories
async def make_query_image(
user: User, user_info: UserInfoSuccess, sprint: SoloRecord | None, blitz: SoloRecord | None
) -> bytes:
L = TypeVar('L', NeverPlayedLeague, NeverRatedLeague, RatedLeague)
@overload
def get_league(user_info: UserInfoSuccess, league_type: type[L]) -> L: ...
@overload
def get_league(
user_info: UserInfoSuccess, league_type: None = None
) -> NeverPlayedLeague | NeverRatedLeague | RatedLeague: ...
def get_league(
user_info: UserInfoSuccess, league_type: type[L] | None = None
) -> L | NeverPlayedLeague | NeverRatedLeague | RatedLeague:
league = user_info.data.user.league
if not isinstance(league, RatedLeague) or league.vs is None:
raise TypeError
user_name = user_info.data.user.username.upper()
if league_type is None:
return league
if isinstance(league, league_type):
return league
raise FallbackError
def get_sprint(user_records: UserRecordsSuccess) -> SoloModeRecord:
return user_records.data.records.sprint
def get_blitz(user_records: UserRecordsSuccess) -> SoloModeRecord:
return user_records.data.records.blitz
async def make_query_image_v1(player: Player) -> bytes:
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
league = get_league(user_info, RatedLeague)
sprint, blitz = get_sprint(user_records).record, get_blitz(user_records).record
if league.vs is None:
raise FallbackError
histories = await query_historical_data(user, user_info)
value_max, value_min = get_value_bounds([i.tr for i in histories])
split_value, offset = get_split(value_max, value_min)
@@ -228,18 +267,19 @@ async def make_query_image(
else:
sprint_value = 'N/A'
blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A'
netloc = get_self_netloc()
async with HostPage(
await render(
page=await render(
'v1/tetrio/info',
Info(
user=TemplateUser(
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
V1TemplateInfo(
user=V1TemplateUser(
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
if user_info.data.user.avatar_revision is not None
else Avatar(
type='identicon',
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
),
name=user_name,
name=user.name.upper(),
bio=user_info.data.user.bio,
),
ranking=Ranking(
@@ -277,12 +317,130 @@ async def make_query_image(
),
)
) as page_hash:
return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
return await screenshot(f'http://{netloc}/host/{page_hash}.html')
def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: SoloModeRecord) -> UniMessage:
league = user_info.data.user.league
user_name = user_info.data.user.username.upper()
N = TypeVar('N', int, float)
def handling_special_value(value: N) -> N | None:
return value if value != -1 else None
async def make_query_image_v2(player: Player) -> bytes:
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
league = get_league(user_info)
sprint, blitz = get_sprint(user_records), get_blitz(user_records)
if sprint.record is not None:
duration = timedelta(milliseconds=sprint.record.endcontext.final_time).total_seconds()
sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004
else:
sprint_value = 'N/A'
play_time: str | None
if (game_time := handling_special_value(user_info.data.user.gametime)) is not None:
if game_time // 3600 > 0:
play_time = f'{game_time//3600:.0f}h {game_time % 3600 // 60:.0f}m {game_time % 60:.0f}s'
elif game_time // 60 > 0:
play_time = f'{game_time//60:.0f}m {game_time % 60:.0f}s'
else:
play_time = f'{game_time:.0f}s'
else:
play_time = game_time
netloc = get_self_netloc()
async with HostPage(
await render(
'v2/tetrio/info',
V2TemplateInfo(
user=V2TemplateUser(
id=user.ID,
name=user.name.upper(),
bio=user_info.data.user.bio,
banner=f'http://{netloc}/host/resource/tetrio/banners/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
if user_info.data.user.banner_revision is not None and user_info.data.user.banner_revision != 0
else None,
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
if user_info.data.user.avatar_revision is not None
else Avatar(
type='identicon',
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
),
badges=[
Badge(
id=i.id,
description=i.label,
group=i.group,
receive_at=i.ts if isinstance(i.ts, datetime) else None,
)
for i in user_info.data.user.badges
],
country=user_info.data.user.country,
xp=user_info.data.user.xp,
friend_count=user_info.data.user.friend_count or 0,
supporter_tier=user_info.data.user.supporter_tier,
bad_standing=user_info.data.user.badstanding or False,
verified=user_info.data.user.verified,
playtime=play_time,
join_at=user_info.data.user.ts,
),
tetra_league=V2TemplateTetraLeague(
rank=league.rank,
highest_rank=league.bestrank,
tr=round(league.rating, 2),
glicko=round(league.glicko, 2),
rd=round(league.rd, 2),
global_rank=handling_special_value(league.standing),
country_rank=handling_special_value(league.standing_local),
pps=(
metrics := get_metrics(pps=league.pps, apm=league.apm, vs=league.vs)
if league.vs is not None
else get_metrics(pps=league.pps, apm=league.apm)
).pps,
apm=metrics.apm,
apl=metrics.apl,
vs=metrics.vs if isinstance(metrics, TetrisMetricsProWithPPSVS) else None,
adpl=metrics.adpl if isinstance(metrics, TetrisMetricsProWithPPSVS) else None,
statistic=TetraLeagueStatistic(
total=league.gamesplayed,
wins=league.gameswon,
),
decaying=league.decaying,
)
if isinstance(league, RatedLeague)
else None,
statistic=Statistic(
total=handling_special_value(user_info.data.user.gamesplayed),
wins=handling_special_value(user_info.data.user.gameswon),
),
sprint=Sprint(
time=sprint_value,
global_rank=sprint.rank,
play_at=sprint.record.ts,
)
if sprint.record is not None
else None,
blitz=Blitz(
score=blitz.record.endcontext.score,
global_rank=blitz.rank,
play_at=blitz.record.ts,
)
if blitz.record is not None
else None,
zen=Zen.model_validate(model_dump(user_records.data.zen)),
),
),
) as page_hash:
return await screenshot(f'http://{netloc}/host/{page_hash}.html')
async def make_query_text(player: Player) -> UniMessage:
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
league = get_league(user_info)
sprint, blitz = get_sprint(user_records), get_blitz(user_records)
user_name = user.name.upper()
message = ''
if isinstance(league, NeverPlayedLeague):
message += f'用户 {user_name} 没有排位统计数据'
@@ -295,12 +453,15 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S
else:
message += f'{league.rank.upper()} 段用户 {user_name} {round(league.rating,2)} TR (#{league.standing})'
message += f', 段位分 {round(league.glicko,2)}±{round(league.rd,2)}, 最近十场的数据:'
lpm = league.pps * 24
message += f"\nL'PM: {round(lpm, 2)} ( {league.pps} pps )"
message += f'\nAPM: {league.apm} ( x{round(league.apm/lpm,2)} )'
if league.vs is not None:
adpm = league.vs * 0.6
message += f'\nADPM: {round(adpm,2)} ( x{round(adpm/lpm,2)} ) ( {league.vs}vs )'
metrics = (
get_metrics(pps=league.pps, apm=league.apm, vs=league.vs)
if league.vs is not None
else get_metrics(pps=league.pps, apm=league.apm)
)
message += f"\nL'PM: {metrics.lpm} ( {metrics.pps} pps )"
message += f'\nAPM: {metrics.apm} ( x{metrics.apl} )'
if isinstance(metrics, TetrisMetricsProWithPPSVS):
message += f'\nADPM: {metrics.adpm} ( x{metrics.adpl} ) ( {metrics.vs}vs )'
if sprint.record is not None:
message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s'
message += f' ( #{sprint.rank} )' if sprint.rank is not None else ''
@@ -310,6 +471,17 @@ def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: S
return UniMessage(message)
async def make_query_result(player: Player, template: Template) -> UniMessage:
try:
if template == 'v1':
return UniMessage.image(raw=await make_query_image_v1(player))
if template == 'v2':
return UniMessage.image(raw=await make_query_image_v2(player))
except FallbackError:
...
return await make_query_text(player)
class FullExport:
cache: ClassVar[defaultdict[str, set[tuple[datetime, Number]]]] = defaultdict(set)
latest_update: ClassVar[date | None] = None

View File

@@ -30,14 +30,14 @@ from .api.schemas.user import User
from .api.tetra_league import full_export
from .api.typing import Rank
from .constant import GAME_TYPE, RANK_PERCENTILE
from .model import IORank
from .models import IORank
UTC = timezone.utc
driver = get_driver()
@alc.assign('rank')
@alc.assign('TETRIO.rank')
async def _(matcher: Matcher, rank: Rank, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),

View File

@@ -0,0 +1,3 @@
from typing import Literal
Template = Literal['v1', 'v2']

View File

@@ -1,11 +1,9 @@
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
from nonebot_plugin_alconna import At, on_alconna
from arclet.alconna import Arg, ArgFlag, Args, Subcommand
from nonebot_plugin_alconna import At
from ... import ns
from ...utils.exception import MessageFormatError
from ...utils.typing import Me
from .. import add_default_handlers
from ..constant import BIND_COMMAND, QUERY_COMMAND
from .. import add_block_handlers, alc
from .api import Player
from .constant import USER_NAME
@@ -16,31 +14,28 @@ def get_player(name: str) -> Player | MessageFormatError:
return MessageFormatError('用户名/ID不合法')
alc = on_alconna(
Alconna(
'top',
Option(
BIND_COMMAND[0],
alc.command.add(
Subcommand(
'TOP',
Subcommand(
'bind',
Args(
Arg(
'account',
get_player,
notice='TOP 用户名',
notice='TOP 用户名 / ID',
flags=[ArgFlag.HIDDEN],
)
),
alias=BIND_COMMAND[1:],
compact=True,
dest='bind',
help_text='绑定 TOP 账号',
),
Option(
QUERY_COMMAND[0],
Subcommand(
'query',
Args(
Arg(
'target',
At | Me,
notice='@想要查询的人 | 自己',
notice='@想要查询的人 / 自己',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
Arg(
@@ -50,25 +45,15 @@ alc = on_alconna(
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
),
alias=QUERY_COMMAND[1:],
compact=True,
dest='query',
help_text='查询 TOP 游戏信息',
),
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
meta=CommandMeta(
description='查询 TetrisOnline波兰服 的信息',
example='top绑定scdhh\ntop查我',
compact=True,
fuzzy_match=True,
),
namespace=ns,
),
skip_for_unmatch=False,
auto_send_output=True,
aliases={'TOP'},
help_text='TOP 游戏相关指令',
)
)
from . import bind, query # noqa: E402, F401
alc.shortcut('(?i:top)(?i:绑定|绑|bind)', {'command': 'tstats TOP bind', 'humanized': 'top绑定'})
alc.shortcut('(?i:top)(?i:查询|查|query|stats)', {'command': 'tstats TOP query', 'humanized': 'top查'})
add_default_handlers(alc)
add_block_handlers(alc.assign('TOP.query'))
from . import bind, query # noqa: E402, F401

View File

@@ -1,5 +1,3 @@
from urllib.parse import urlunparse
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
@@ -8,8 +6,8 @@ from nonebot_plugin_user import User # type: ignore[import-untyped]
from nonebot_plugin_userinfo import BotUserInfo, EventUserInfo, UserInfo # type: ignore[import-untyped]
from ...db import BindStatus, create_or_update_bind, trigger
from ...utils.avatar import get_avatar
from ...utils.host import HostPage, get_self_netloc
from ...utils.image import get_avatar
from ...utils.render import Bind, render
from ...utils.render.schemas.base import People
from ...utils.screenshot import screenshot
@@ -18,7 +16,7 @@ from .api import Player
from .constant import GAME_TYPE
@alc.assign('bind')
@alc.assign('TOP.bind')
async def _(
nb_user: User,
account: Player,
@@ -60,5 +58,5 @@ async def _(
)
) as page_hash:
await UniMessage.image(
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
raw=await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
).finish()

View File

@@ -17,7 +17,7 @@ from .api.schemas.user_profile import UserProfile
from .constant import GAME_TYPE
@alc.assign('query')
@alc.assign('TOP.query')
async def _(event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
@@ -39,7 +39,7 @@ async def _(event: Event, matcher: Matcher, target: At | Me, event_session: Even
await (message + make_query_text(await Player(user_name=bind.game_account, trust=True).get_profile())).finish()
@alc.assign('query')
@alc.assign('TOP.query')
async def _(account: Player, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),

View File

@@ -1,11 +1,9 @@
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
from nonebot_plugin_alconna import At, on_alconna
from arclet.alconna import Arg, ArgFlag, Args, Subcommand
from nonebot_plugin_alconna import At
from ... import ns
from ...utils.exception import MessageFormatError
from ...utils.typing import Me
from .. import add_default_handlers
from ..constant import BIND_COMMAND, QUERY_COMMAND
from .. import add_block_handlers, alc
from .api import Player
from .constant import USER_NAME
@@ -21,31 +19,28 @@ def get_player(teaid_or_name: str) -> Player | MessageFormatError:
return MessageFormatError('用户名/ID不合法')
alc = on_alconna(
Alconna(
'茶服',
Option(
BIND_COMMAND[0],
alc.command.add(
Subcommand(
'TOS',
Subcommand(
'bind',
Args(
Arg(
'account',
get_player,
notice='茶服 用户名 / TeaID',
notice='茶服 用户名 / ID',
flags=[ArgFlag.HIDDEN],
)
),
alias=BIND_COMMAND[1:],
compact=True,
dest='bind',
help_text='绑定 茶服 账号',
),
Option(
QUERY_COMMAND[0],
Subcommand(
'query',
Args(
Arg(
'target',
At | Me,
notice='@想要查询的人 | 自己',
notice='@想要查询的人 / 自己',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
Arg(
@@ -54,28 +49,16 @@ alc = on_alconna(
notice='茶服 用户名 / TeaID',
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
),
# 如果放在一个 Union Args 里, 验证顺序不能保证, 可能出错
),
alias=QUERY_COMMAND[1:],
compact=True,
dest='query',
help_text='查询 茶服 游戏信息',
),
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
meta=CommandMeta(
description='查询 TetrisOnline茶服 的信息',
example='茶服查我',
compact=True,
fuzzy_match=True,
),
namespace=ns,
),
skip_for_unmatch=False,
auto_send_output=True,
aliases={'tos', 'TOS'},
help_text='茶服 游戏相关指令',
)
)
alc.shortcut('(?i:tos|茶服)(?i:绑定|绑|bind)', {'command': 'tstats TOS bind', 'humanized': '茶服绑定'})
alc.shortcut('(?i:tos|茶服)(?i:查询|查|query|stats)', {'command': 'tstats TOS query', 'humanized': '茶服查'})
add_block_handlers(alc.assign('TOS.query'))
from . import bind, query # noqa: E402, F401
add_default_handlers(alc)

View File

@@ -1,5 +1,3 @@
from urllib.parse import urlunparse
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
@@ -8,8 +6,8 @@ from nonebot_plugin_user import User # type: ignore[import-untyped]
from nonebot_plugin_userinfo import BotUserInfo, EventUserInfo, UserInfo # type: ignore[import-untyped]
from ...db import BindStatus, create_or_update_bind, trigger
from ...utils.avatar import get_avatar
from ...utils.host import HostPage, get_self_netloc
from ...utils.image import get_avatar
from ...utils.render import Bind, render
from ...utils.render.schemas.base import People
from ...utils.screenshot import screenshot
@@ -18,7 +16,7 @@ from .api import Player
from .constant import GAME_TYPE
@alc.assign('bind')
@alc.assign('TOS.bind')
async def _(
nb_user: User,
account: Player,
@@ -60,5 +58,5 @@ async def _(
)
) as page_hash:
await UniMessage.image(
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
raw=await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
).finish()

View File

@@ -2,7 +2,6 @@ from asyncio import gather
from datetime import timedelta
from http import HTTPStatus
from typing import Literal, NamedTuple
from urllib.parse import urlunparse
from nonebot.adapters import Event
from nonebot.matcher import Matcher
@@ -15,9 +14,9 @@ from nonebot_plugin_user import get_user # type: ignore[import-untyped]
from nonebot_plugin_userinfo import EventUserInfo, UserInfo # type: ignore[import-untyped]
from ...db import query_bind_info, trigger
from ...utils.avatar import get_avatar
from ...utils.exception import RequestError
from ...utils.host import HostPage, get_self_netloc
from ...utils.image import get_avatar
from ...utils.metrics import TetrisMetricsProWithLPMADPM, get_metrics
from ...utils.render import render
from ...utils.render.schemas.base import People, Ranking
@@ -34,7 +33,7 @@ from .constant import GAME_TYPE
def add_special_handlers(
teaid_prefix: Literal['onebot-', 'kook-', 'discord-', 'qqguild-'], match_event: type[Event]
) -> None:
@alc.assign('query')
@alc.assign('TOS.query')
async def _(
event: Event,
target: At | Me,
@@ -98,7 +97,7 @@ except ImportError:
pass
@alc.assign('query')
@alc.assign('TOS.query')
async def _(
event: Event,
matcher: Matcher,
@@ -132,7 +131,7 @@ async def _(
await (message + make_query_text(user_info, game_data)).finish()
@alc.assign('query')
@alc.assign('TOS.query')
async def _(account: Player, event_session: EventSession, event_user_info: UserInfo = EventUserInfo()): # noqa: B008
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
@@ -230,7 +229,7 @@ async def make_query_image(user_info: UserInfoSuccess, game_data: GameData, even
),
)
) as page_hash:
return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
def make_query_text(user_info: UserInfoSuccess, game_data: GameData | None) -> UniMessage:

View File

@@ -27,6 +27,10 @@ class MessageFormatError(NeedCatchError):
"""用户发送的消息格式不正确"""
class FallbackError(NeedCatchError):
"""需要回滚至更通用的方法"""
class DoNotCatchError(TetrisStatsError):
"""不应该被捕获的异常基类"""
@@ -35,5 +39,5 @@ class WhatTheFuckError(DoNotCatchError):
"""用于表示不应该出现的情况 ("""
class HandleNotFinishedError(DoNotCatchError):
"""任务没有正常完成处理的错误"""
class NoFallbackError(DoNotCatchError): # 暂时没用 但是先写了
"""没有可用的回退方法"""

View File

@@ -1,26 +1,27 @@
from hashlib import sha256
from ipaddress import IPv4Address, IPv6Address
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, ClassVar, Literal
from fastapi import FastAPI, status
from fastapi.responses import HTMLResponse
from fastapi import FastAPI, Path, status
from fastapi.responses import FileResponse, HTMLResponse, Response
from fastapi.staticfiles import StaticFiles
from nonebot import get_app, get_driver
from nonebot.log import logger
from nonebot_plugin_localstore import get_cache_dir # type: ignore[import-untyped]
from ..config.config import CACHE_PATH
from .image import img_to_png
from .request import Request
from .templates import templates_dir
if TYPE_CHECKING:
from pydantic import IPvAnyAddress
app = get_app()
app: FastAPI = get_app()
driver = get_driver()
global_config = driver.config
cache_dir = get_cache_dir('nonebot_plugin_tetris_stats')
if not isinstance(app, FastAPI):
msg = '本插件需要 FastAPI 驱动器才能运行'
@@ -60,6 +61,22 @@ async def _(page_hash: str) -> HTMLResponse:
return NOT_FOUND
@app.get('/host/resource/tetrio/{resource_type}/{user_id}', status_code=status.HTTP_200_OK)
async def _(
resource_type: Literal['avatars', 'banners'], revision: int, user_id: str = Path(regex=r'^[a-f0-9]{24}$')
) -> Response:
if not (path := CACHE_PATH / 'tetrio' / resource_type / f'{user_id}_{revision}.png').exists():
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(
img_to_png(
await Request.request(
f'https://tetr.io/user-content/{resource_type}/{user_id}.jpg?rv={revision}', is_json=False
)
)
)
return FileResponse(path)
def get_self_netloc() -> str:
host: IPv4Address | IPv6Address | IPvAnyAddress = global_config.host
if isinstance(host, IPv4Address):

View File

@@ -52,3 +52,11 @@ async def get_avatar(user: UserInfo, scheme: Literal['Data URI', 'bytes'], defau
raise TypeError(msg)
return f'data:{Image.MIME[avatar_format]};base64,{b64encode(bot_avatar).decode()}'
return bot_avatar
def img_to_png(image: bytes) -> bytes:
"""将图片转换为 PNG 格式"""
result = BytesIO()
with Image.open(BytesIO(image)) as img:
img.save(result, 'PNG')
return result.getvalue()

View File

@@ -1,19 +0,0 @@
from nonebot.adapters import Bot
def get_platform(bot: Bot) -> str:
try:
from nonebot.adapters.onebot.v12 import Bot as OB12Bot
if isinstance(bot, OB12Bot):
return bot.platform
except ImportError:
pass
try:
from nonebot.adapters.satori import Bot as SaBot
if isinstance(bot, SaBot):
return bot.platform
except ImportError:
pass
return bot.type

View File

@@ -7,6 +7,13 @@ from ...typing import Number
from .base import Avatar
class Badge(BaseModel):
id: str
description: str
group: str | None
receive_at: datetime | None
class User(BaseModel):
id: str
name: str
@@ -23,14 +30,19 @@ class User(BaseModel):
verified: bool
bad_standing: bool
badges: list[str]
badges: list[Badge]
xp: Number
playtime: str
playtime: str | None
join_at: datetime | None
class Statistic(BaseModel):
total: int | None
wins: int | None
class TetraLeagueStatistic(BaseModel):
total: int
wins: int
@@ -44,18 +56,20 @@ class TetraLeague(BaseModel):
glicko: Number
rd: Number
global_rank: int
country_rank: int
global_rank: int | None
country_rank: int | None
pps: Number
apm: Number
adpm: Number
apl: Number
vs: Number
adpl: Number
vs: Number | None
adpl: Number | None
statistic: Statistic
statistic: TetraLeagueStatistic
decaying: bool
class Sprint(BaseModel):

View File

@@ -11,5 +11,5 @@ async def screenshot(url: str) -> bytes:
await browser.new_page(viewport={'width': 3000, 'height': 3000}) as page,
):
await page.goto(url)
await page.wait_for_load_state('networkidle', timeout=5000)
await page.wait_for_load_state('networkidle')
return await page.locator('id=content').screenshot(timeout=5000, type='png')

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = 'nonebot-plugin-tetris-stats'
version = '1.2.11'
version = '1.2.15'
description = '一款基于 NoneBot2 的用于查询 Tetris 相关游戏数据的插件'
authors = ['scdhh <wallfjjd@gmail.com>']
readme = 'README.md'
@@ -129,4 +129,3 @@ quote-style = 'single'
[tool.nonebot]
plugins = ['nonebot_plugin_tetris_stats']
# plugins = ['test_user']