mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
Compare commits
68 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| ce94aee0f4 | |||
| b9c58ae125 | |||
| 92159e93b8 | |||
| f9b11895e2 | |||
| f7c3d493ea | |||
| 4954ab3d60 | |||
| bcca869e72 | |||
| a4247abdad | |||
| 2c1d43601a | |||
| c929c463ec | |||
| 314e1dede3 | |||
| d5b0ef34c5 | |||
| 3d9ef841b1 | |||
| b98871f170 | |||
| 38ab872dd8 | |||
| f44c0baa2e | |||
| 9b8d17577e | |||
| f301bee2b0 | |||
| fbe018e56a | |||
| ab046fe786 | |||
| ce95d8f977 | |||
| fa05b80e61 | |||
| 0ab0d11a98 | |||
| 7f469540b2 | |||
| 21bee29146 | |||
|
|
c2dd9c5d86 | ||
|
|
5927cb2bb5 | ||
|
|
4a4a215b61 | ||
| bfe931d3bf | |||
| b7b152d84d | |||
| b6f6eb1170 | |||
| 934800aae0 | |||
|
|
d19c37e99a | ||
|
|
43167fe9bd | ||
|
|
db8de88667 | ||
| 318b42dbd2 | |||
| af4a9f33b0 | |||
|
|
5e5bc4da2c | ||
|
|
594ea9a76f | ||
|
|
69e9ca7933 | ||
|
|
b1bc111b7a | ||
| 43970f4853 | |||
| 48b200697c | |||
| 1a791f5ef8 | |||
| 9b13a9e87c | |||
| ecad6b8070 | |||
| 1e6932b3de | |||
| 3ef7605e11 | |||
|
|
e8539c15cc | ||
|
|
9ace65f9df | ||
|
|
d727a0bc53 | ||
| 52947556a4 | |||
| 7fe9a6fd3d | |||
| 6dbfd31eab | |||
| 1788d40ed2 | |||
| 18d8e0cdcc | |||
| b37f927be6 | |||
| 314bf4c2f0 | |||
| c9f6817c6a | |||
| 4c7cd00a76 | |||
| b8cf10b45d | |||
| 4ec5c3bde1 | |||
| 270b953bc9 | |||
| 13bd0da592 | |||
| 9545f0b5d0 | |||
| 12f320cbb4 | |||
| 7ff59cfc01 | |||
| 498781f376 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -19,3 +19,4 @@ package-lock.json
|
||||
bot.py
|
||||
TODO
|
||||
*.fish
|
||||
extracted_skin_mino_*
|
||||
|
||||
@@ -5,15 +5,21 @@ require('nonebot_plugin_alconna')
|
||||
require('nonebot_plugin_apscheduler')
|
||||
require('nonebot_plugin_localstore')
|
||||
require('nonebot_plugin_orm')
|
||||
require('nonebot_plugin_session')
|
||||
require('nonebot_plugin_session_orm')
|
||||
require('nonebot_plugin_session')
|
||||
require('nonebot_plugin_user')
|
||||
|
||||
from nonebot_plugin_alconna import namespace # noqa: E402
|
||||
|
||||
with namespace('tetris_stats') as ns:
|
||||
ns.enable_message_cache = False
|
||||
|
||||
from .config.config import migrations # noqa: E402
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name='Tetris Stats',
|
||||
description='一个用于查询 Tetris 相关游戏玩家数据的插件',
|
||||
usage='发送 {游戏名} --help 查询使用方法',
|
||||
usage='发送 tstats --help 查询使用方法',
|
||||
type='application',
|
||||
homepage='https://github.com/A-minos/nonebot-plugin-tetris-stats',
|
||||
extra={
|
||||
@@ -21,5 +27,4 @@ __plugin_meta__ = PluginMetadata(
|
||||
},
|
||||
)
|
||||
|
||||
from . import game_data_processor # noqa: F401, E402
|
||||
from .utils import host # noqa: F401, E402
|
||||
from . import games # noqa: F401, E402
|
||||
|
||||
@@ -46,7 +46,9 @@ def upgrade(name: str = '') -> None: # noqa: C901
|
||||
TimeRemainingColumn,
|
||||
)
|
||||
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseProcessedData # type: ignore[attr-defined]
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import ( # type: ignore[import-untyped]
|
||||
BaseProcessedData,
|
||||
)
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
Base.prepare(autoload_with=op.get_bind())
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
"""Add TETRIO user configuration
|
||||
|
||||
迁移 ID: a1195e989cc6
|
||||
父迁移: b15844837693
|
||||
创建时间: 2024-06-09 04:20:07.819194
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
revision: str = 'a1195e989cc6'
|
||||
down_revision: str | Sequence[str] | None = 'b15844837693'
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
'nonebot_plugin_tetris_stats_tetriouserconfig',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('query_template', sa.String(length=2), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetriouserconfig')),
|
||||
info={'bind_key': 'nonebot_plugin_tetris_stats'},
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table('nonebot_plugin_tetris_stats_tetriouserconfig')
|
||||
# ### end Alembic commands ###
|
||||
@@ -0,0 +1,71 @@
|
||||
"""Migrate to nonobot-plugin-user
|
||||
|
||||
迁移 ID: b15844837693
|
||||
父迁移: 3c25a5a8c050
|
||||
创建时间: 2024-06-08 02:27:35.227596
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
revision: str = 'b15844837693'
|
||||
down_revision: str | Sequence[str] | None = '3c25a5a8c050'
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_bind', schema=None) as batch_op:
|
||||
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_bind_chat_account')
|
||||
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_bind_chat_platform')
|
||||
|
||||
op.drop_table('nonebot_plugin_tetris_stats_bind')
|
||||
|
||||
op.create_table(
|
||||
'nonebot_plugin_tetris_stats_bind',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('user_id', sa.Integer(), nullable=False),
|
||||
sa.Column('game_platform', sa.String(length=32), nullable=False),
|
||||
sa.Column('game_account', sa.String(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_bind')),
|
||||
info={'bind_key': 'nonebot_plugin_tetris_stats'},
|
||||
)
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_bind', schema=None) as batch_op:
|
||||
batch_op.create_index(batch_op.f('ix_nonebot_plugin_tetris_stats_bind_user_id'), ['user_id'], unique=False)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_bind', schema=None) as batch_op:
|
||||
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_bind_user_id'))
|
||||
|
||||
op.drop_table('nonebot_plugin_tetris_stats_bind')
|
||||
|
||||
op.create_table(
|
||||
'nonebot_plugin_tetris_stats_bind',
|
||||
sa.Column('id', sa.INTEGER(), nullable=False),
|
||||
sa.Column('chat_platform', sa.VARCHAR(length=32), nullable=False),
|
||||
sa.Column('chat_account', sa.VARCHAR(), nullable=False),
|
||||
sa.Column('game_platform', sa.VARCHAR(length=32), nullable=False),
|
||||
sa.Column('game_account', sa.VARCHAR(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name='pk_nonebot_plugin_tetris_stats_bind'),
|
||||
)
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_bind', schema=None) as batch_op:
|
||||
batch_op.create_index('ix_nonebot_plugin_tetris_stats_bind_chat_platform', ['chat_platform'], unique=False)
|
||||
batch_op.create_index('ix_nonebot_plugin_tetris_stats_bind_chat_account', ['chat_account'], unique=False)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
@@ -46,7 +46,7 @@ def upgrade(name: str = '') -> None:
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseUser
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseUser # type: ignore[import-untyped]
|
||||
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('user_unique_identifier', sa.String(length=32), nullable=True))
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from asyncio import Lock
|
||||
from collections.abc import AsyncGenerator
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime, timezone
|
||||
@@ -7,6 +8,7 @@ from typing import TYPE_CHECKING, Literal, TypeVar, overload
|
||||
from nonebot.exception import FinishedException
|
||||
from nonebot.log import logger
|
||||
from nonebot_plugin_orm import AsyncSession, get_session
|
||||
from nonebot_plugin_user import User # type: ignore[import-untyped]
|
||||
from sqlalchemy import select
|
||||
|
||||
from ..utils.typing import CommandType, GameType
|
||||
@@ -15,9 +17,9 @@ from .models import Bind, TriggerHistoricalData
|
||||
UTC = timezone.utc
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..game_data_processor.io_data_processor.api.models import TETRIOHistoricalData
|
||||
from ..game_data_processor.top_data_processor.api.models import TOPHistoricalData
|
||||
from ..game_data_processor.tos_data_processor.api.models import TOSHistoricalData
|
||||
from ..games.tetrio.api.models import TETRIOHistoricalData
|
||||
from ..games.top.api.models import TOPHistoricalData
|
||||
from ..games.tos.api.models import TOSHistoricalData
|
||||
|
||||
|
||||
class BindStatus(Enum):
|
||||
@@ -27,37 +29,28 @@ class BindStatus(Enum):
|
||||
|
||||
async def query_bind_info(
|
||||
session: AsyncSession,
|
||||
chat_platform: str,
|
||||
chat_account: str,
|
||||
user: User,
|
||||
game_platform: GameType,
|
||||
) -> Bind | None:
|
||||
return (
|
||||
await session.scalars(
|
||||
select(Bind)
|
||||
.where(Bind.chat_platform == chat_platform)
|
||||
.where(Bind.chat_account == chat_account)
|
||||
.where(Bind.game_platform == game_platform)
|
||||
)
|
||||
await session.scalars(select(Bind).where(Bind.user_id == user.id).where(Bind.game_platform == game_platform))
|
||||
).one_or_none()
|
||||
|
||||
|
||||
async def create_or_update_bind(
|
||||
session: AsyncSession,
|
||||
chat_platform: str,
|
||||
chat_account: str,
|
||||
user: User,
|
||||
game_platform: GameType,
|
||||
game_account: str,
|
||||
) -> BindStatus:
|
||||
bind = await query_bind_info(
|
||||
session=session,
|
||||
chat_platform=chat_platform,
|
||||
chat_account=chat_account,
|
||||
user=user,
|
||||
game_platform=game_platform,
|
||||
)
|
||||
if bind is None:
|
||||
bind = Bind(
|
||||
chat_platform=chat_platform,
|
||||
chat_account=chat_account,
|
||||
user_id=user.id,
|
||||
game_platform=game_platform,
|
||||
game_account=game_account,
|
||||
)
|
||||
@@ -72,9 +65,11 @@ async def create_or_update_bind(
|
||||
|
||||
T = TypeVar('T', 'TETRIOHistoricalData', 'TOPHistoricalData', 'TOSHistoricalData')
|
||||
|
||||
lock = Lock()
|
||||
|
||||
|
||||
async def anti_duplicate_add(cls: type[T], model: T) -> None:
|
||||
async with get_session() as session:
|
||||
async with lock, get_session() as session:
|
||||
result = (
|
||||
await session.scalars(
|
||||
select(cls)
|
||||
@@ -88,8 +83,8 @@ async def anti_duplicate_add(cls: type[T], model: T) -> None:
|
||||
if i.data == model.data:
|
||||
logger.debug('Anti duplicate successfully')
|
||||
return
|
||||
session.add(model)
|
||||
await session.commit()
|
||||
session.add(model)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
|
||||
@@ -23,9 +23,8 @@ class PydanticType(TypeDecorator):
|
||||
*args: Any,
|
||||
**kwargs: Any,
|
||||
):
|
||||
for i in get_model:
|
||||
models.update(i())
|
||||
self.models = models
|
||||
self.get_model = get_model
|
||||
self._models = models
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
if PYDANTIC_V2:
|
||||
@@ -33,7 +32,7 @@ class PydanticType(TypeDecorator):
|
||||
@override
|
||||
def process_bind_param(self, value: Any | None, dialect: Dialect) -> str:
|
||||
# 将 Pydantic 模型实例转换为 JSON
|
||||
if isinstance(value, tuple(self.pydantic_models)):
|
||||
if isinstance(value, tuple(self.models)):
|
||||
return value.model_dump_json(by_alias=True) # type: ignore[union-attr]
|
||||
raise TypeError
|
||||
else:
|
||||
@@ -41,7 +40,7 @@ class PydanticType(TypeDecorator):
|
||||
@override
|
||||
def process_bind_param(self, value: Any | None, dialect: Dialect) -> str:
|
||||
# 将 Pydantic 模型实例转换为 JSON
|
||||
if isinstance(value, tuple(self.pydantic_models)):
|
||||
if isinstance(value, tuple(self.models)):
|
||||
return value.json(by_alias=True) # type: ignore[union-attr]
|
||||
raise TypeError
|
||||
|
||||
@@ -49,18 +48,25 @@ class PydanticType(TypeDecorator):
|
||||
def process_result_value(self, value: Any | None, dialect: Dialect) -> BaseModel:
|
||||
# 将 JSON 转换回 Pydantic 模型实例
|
||||
if isinstance(value, str | bytes):
|
||||
for i in self.pydantic_models:
|
||||
for i in self.models:
|
||||
try:
|
||||
return type_validate_json(i, value)
|
||||
except ValidationError: # noqa: PERF203
|
||||
...
|
||||
raise ValueError
|
||||
|
||||
@property
|
||||
def models(self) -> tuple[type[BaseModel], ...]:
|
||||
models: set[type[BaseModel]] = set()
|
||||
for i in self.get_model:
|
||||
models.update(i())
|
||||
models.update(self._models)
|
||||
return tuple(models)
|
||||
|
||||
|
||||
class Bind(MappedAsDataclass, Model):
|
||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||
chat_platform: Mapped[str] = mapped_column(String(32), index=True)
|
||||
chat_account: Mapped[str] = mapped_column(index=True)
|
||||
user_id: Mapped[int] = mapped_column(index=True)
|
||||
game_platform: Mapped[GameType] = mapped_column(String(32))
|
||||
game_account: Mapped[str]
|
||||
|
||||
|
||||
@@ -1,44 +0,0 @@
|
||||
from typing import Any
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.exception import FinishedException
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot_plugin_alconna import AlcMatches, AlconnaMatcher, At
|
||||
|
||||
from ..utils.exception import MessageFormatError, NeedCatchError
|
||||
|
||||
|
||||
def add_default_handlers(matcher: type[AlconnaMatcher]) -> None:
|
||||
@matcher.assign('query')
|
||||
async def _(bot: Bot, matcher: Matcher, target: At):
|
||||
if isinstance(target, At) and target.target == bot.self_id:
|
||||
await matcher.finish('不能查询bot的信息')
|
||||
|
||||
@matcher.handle()
|
||||
async def _(matcher: Matcher, account: MessageFormatError):
|
||||
await matcher.finish(str(account))
|
||||
|
||||
@matcher.handle()
|
||||
async def _(matcher: Matcher, matches: AlcMatches):
|
||||
if matches.head_matched and matches.options != {} or matches.main_args == {}:
|
||||
await matcher.finish(
|
||||
(f'{matches.error_info!r}\n' if matches.error_info is not None else '')
|
||||
+ f'输入"{matches.header_result} --help"查看帮助'
|
||||
)
|
||||
|
||||
@matcher.handle()
|
||||
def _(other: Any): # noqa: ANN401, ARG001
|
||||
raise FinishedException
|
||||
|
||||
|
||||
from . import ( # noqa: F401, E402
|
||||
io_data_processor,
|
||||
top_data_processor,
|
||||
tos_data_processor,
|
||||
)
|
||||
|
||||
|
||||
@run_postprocessor
|
||||
async def _(matcher: Matcher, exception: NeedCatchError):
|
||||
await matcher.send(str(exception))
|
||||
@@ -1,85 +0,0 @@
|
||||
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
|
||||
from nonebot_plugin_alconna import At, on_alconna
|
||||
|
||||
from ...utils.exception import MessageFormatError
|
||||
from ...utils.typing import Me
|
||||
from .. import add_default_handlers
|
||||
from ..constant import BIND_COMMAND, QUERY_COMMAND
|
||||
from .api import Player
|
||||
from .api.typing import Rank
|
||||
from .constant import USER_ID, USER_NAME
|
||||
|
||||
|
||||
def get_player(user_id_or_name: str) -> Player | MessageFormatError:
|
||||
if USER_ID.match(user_id_or_name):
|
||||
return Player(user_id=user_id_or_name, trust=True)
|
||||
if USER_NAME.match(user_id_or_name):
|
||||
return Player(user_name=user_id_or_name, trust=True)
|
||||
return MessageFormatError('用户名/ID不合法')
|
||||
|
||||
|
||||
alc = on_alconna(
|
||||
Alconna(
|
||||
'io',
|
||||
Option(
|
||||
BIND_COMMAND[0],
|
||||
Args(
|
||||
Arg(
|
||||
'account',
|
||||
get_player,
|
||||
notice='IO 用户名 / ID',
|
||||
flags=[ArgFlag.HIDDEN],
|
||||
)
|
||||
),
|
||||
alias=BIND_COMMAND[1:],
|
||||
compact=True,
|
||||
dest='bind',
|
||||
help_text='绑定 IO 账号',
|
||||
),
|
||||
Option(
|
||||
QUERY_COMMAND[0],
|
||||
Args(
|
||||
Arg(
|
||||
'target',
|
||||
At | Me,
|
||||
notice='@想要查询的人 | 自己',
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
Arg(
|
||||
'account',
|
||||
get_player,
|
||||
notice='IO 用户名 / ID',
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
),
|
||||
alias=QUERY_COMMAND[1:],
|
||||
compact=True,
|
||||
dest='query',
|
||||
help_text='查询 IO 游戏信息',
|
||||
),
|
||||
Option(
|
||||
'rank',
|
||||
Args(Arg('rank', Rank, notice='IO 段位')),
|
||||
alias={'Rank', 'RANK', '段位'},
|
||||
compact=True,
|
||||
dest='rank',
|
||||
help_text='查询 IO 段位信息',
|
||||
),
|
||||
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
|
||||
meta=CommandMeta(
|
||||
description='查询 TETR.IO 的信息',
|
||||
example='io绑定scdhh\nio查我\niorankx',
|
||||
compact=True,
|
||||
fuzzy_match=True,
|
||||
),
|
||||
),
|
||||
skip_for_unmatch=False,
|
||||
auto_send_output=True,
|
||||
aliases={'IO'},
|
||||
)
|
||||
|
||||
alc.shortcut('fkosk', {'command': 'io查', 'args': ['我'], 'fuzzy': False, 'humanized': 'An Easter egg!'})
|
||||
|
||||
from . import bind, query, rank # noqa: F401, E402
|
||||
|
||||
add_default_handlers(alc)
|
||||
@@ -1,296 +0,0 @@
|
||||
import contextlib
|
||||
from asyncio import gather
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from hashlib import md5
|
||||
from math import ceil, floor
|
||||
from urllib.parse import urlunparse
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import At
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from sqlalchemy import select
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.platform import get_platform
|
||||
from ...utils.render import TETRIOInfo, render
|
||||
from ...utils.render.schemas.base import Avatar
|
||||
from ...utils.render.schemas.tetrio_info import Data, Radar, Ranking, TetraLeague, TetraLeagueHistory
|
||||
from ...utils.render.schemas.tetrio_info import User as TemplateUser
|
||||
from ...utils.screenshot import screenshot
|
||||
from ...utils.typing import Me
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import alc
|
||||
from .api import Player, User, UserInfoSuccess
|
||||
from .api.models import TETRIOHistoricalData
|
||||
from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague
|
||||
from .api.schemas.user_records import SoloModeRecord, SoloRecord
|
||||
from .constant import GAME_TYPE, TR_MAX, TR_MIN
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
|
||||
@alc.assign('query')
|
||||
async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
async with get_session() as session:
|
||||
bind = await query_bind_info(
|
||||
session=session,
|
||||
chat_platform=get_platform(bot),
|
||||
chat_account=(target.target if isinstance(target, At) else event.get_user_id()),
|
||||
game_platform=GAME_TYPE,
|
||||
)
|
||||
if bind is None:
|
||||
await matcher.finish('未查询到绑定信息')
|
||||
message = UniMessage(CANT_VERIFY_MESSAGE)
|
||||
player = Player(user_id=bind.game_account, trust=True)
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
sprint = user_records.data.records.sprint
|
||||
blitz = user_records.data.records.blitz
|
||||
with contextlib.suppress(TypeError):
|
||||
message += UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record))
|
||||
await message.finish()
|
||||
message += make_query_text(user_info, sprint, blitz)
|
||||
await message.finish()
|
||||
|
||||
|
||||
@alc.assign('query')
|
||||
async def _(account: Player, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
user, user_info, user_records = await gather(account.user, account.get_info(), account.get_records())
|
||||
sprint = user_records.data.records.sprint
|
||||
blitz = user_records.data.records.blitz
|
||||
with contextlib.suppress(TypeError):
|
||||
await UniMessage.image(raw=await make_query_image(user, user_info, sprint.record, blitz.record)).finish()
|
||||
await make_query_text(user_info, sprint, blitz).finish()
|
||||
|
||||
|
||||
def get_value_bounds(values: list[int | float]) -> tuple[int, int]:
|
||||
value_max = 10 * ceil(max(values) / 10)
|
||||
value_min = 10 * floor(min(values) / 10)
|
||||
return value_max, value_min
|
||||
|
||||
|
||||
def get_split(value_max: int, value_min: int) -> tuple[int, int]:
|
||||
offset = 0
|
||||
overflow = 0
|
||||
|
||||
while True:
|
||||
if (new_max_value := value_max + offset + overflow) > TR_MAX:
|
||||
overflow -= 1
|
||||
continue
|
||||
if (new_min_value := value_min - offset + overflow) < TR_MIN:
|
||||
overflow += 1
|
||||
continue
|
||||
if ((new_max_value - new_min_value) / 40).is_integer():
|
||||
split_value = int((value_max + offset - (value_min - offset)) / 4)
|
||||
break
|
||||
offset += 1
|
||||
return split_value, offset + overflow
|
||||
|
||||
|
||||
def get_specified_point(
|
||||
previous_point: Data,
|
||||
behind_point: Data,
|
||||
point_time: datetime,
|
||||
) -> Data:
|
||||
"""根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据
|
||||
|
||||
Args:
|
||||
previous_point (Data): 前面的数据点
|
||||
behind_point (Data): 后面的数据点
|
||||
point_time (datetime): 要推算的点的位置
|
||||
|
||||
Returns:
|
||||
Data: 要推算的点的数据
|
||||
"""
|
||||
# 求两个点的斜率
|
||||
slope = (behind_point.tr - previous_point.tr) / (
|
||||
datetime.timestamp(behind_point.record_at) - datetime.timestamp(previous_point.record_at)
|
||||
)
|
||||
return Data(
|
||||
record_at=point_time,
|
||||
tr=previous_point.tr + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.record_at)),
|
||||
)
|
||||
|
||||
|
||||
async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[Data]:
|
||||
today = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
forward = timedelta(days=9)
|
||||
start_time = (today - forward).astimezone(UTC)
|
||||
async with get_session() as session:
|
||||
historical_data = (
|
||||
await session.scalars(
|
||||
select(TETRIOHistoricalData)
|
||||
.where(TETRIOHistoricalData.update_time >= start_time)
|
||||
.where(TETRIOHistoricalData.user_unique_identifier == user.unique_identifier)
|
||||
.where(TETRIOHistoricalData.api_type == 'User Info')
|
||||
)
|
||||
).all()
|
||||
if historical_data:
|
||||
extra = (
|
||||
await session.scalars(
|
||||
select(TETRIOHistoricalData)
|
||||
.where(TETRIOHistoricalData.user_unique_identifier == user.unique_identifier)
|
||||
.where(TETRIOHistoricalData.api_type == 'User Info')
|
||||
.order_by(TETRIOHistoricalData.id.desc())
|
||||
.where(TETRIOHistoricalData.id < min([i.id for i in historical_data]))
|
||||
.limit(1)
|
||||
)
|
||||
).one_or_none()
|
||||
if extra is not None:
|
||||
historical_data = list(historical_data)
|
||||
historical_data.append(extra)
|
||||
if not historical_data:
|
||||
return [
|
||||
Data(record_at=today - forward, tr=user_info.data.user.league.rating),
|
||||
Data(record_at=today.replace(microsecond=1000), tr=user_info.data.user.league.rating),
|
||||
]
|
||||
histories = [
|
||||
Data(
|
||||
record_at=i.update_time.astimezone(ZoneInfo('Asia/Shanghai')),
|
||||
tr=i.data.data.user.league.rating,
|
||||
)
|
||||
for i in historical_data
|
||||
if isinstance(i.data, UserInfoSuccess) and isinstance(i.data.data.user.league, RatedLeague)
|
||||
]
|
||||
|
||||
# 按照时间排序
|
||||
histories = sorted(histories, key=lambda x: x.record_at)
|
||||
for index, value in enumerate(histories):
|
||||
# 在历史记录里找有没有今天0点后的数据
|
||||
if value.record_at > today:
|
||||
histories = histories[:index] + [
|
||||
get_specified_point(histories[index - 1], histories[index], today.replace(microsecond=1000))
|
||||
]
|
||||
break
|
||||
else:
|
||||
histories.append(
|
||||
get_specified_point(
|
||||
histories[-1],
|
||||
Data(record_at=user_info.cache.cached_at, tr=user_info.data.user.league.rating),
|
||||
today.replace(microsecond=1000),
|
||||
)
|
||||
)
|
||||
if histories[0].record_at < (today - forward):
|
||||
histories[0] = get_specified_point(
|
||||
histories[0],
|
||||
histories[1],
|
||||
today - forward,
|
||||
)
|
||||
else:
|
||||
histories.insert(0, Data(record_at=today - forward, tr=histories[0].tr))
|
||||
return histories
|
||||
|
||||
|
||||
async def make_query_image(
|
||||
user: User, user_info: UserInfoSuccess, sprint: SoloRecord | None, blitz: SoloRecord | None
|
||||
) -> bytes:
|
||||
league = user_info.data.user.league
|
||||
if not isinstance(league, RatedLeague) or league.vs is None:
|
||||
raise TypeError
|
||||
user_name = user_info.data.user.username.upper()
|
||||
histories = await query_historical_data(user, user_info)
|
||||
value_max, value_min = get_value_bounds([i.tr for i in histories])
|
||||
split_value, offset = get_split(value_max, value_min)
|
||||
if sprint is not None:
|
||||
duration = timedelta(milliseconds=sprint.endcontext.final_time).total_seconds()
|
||||
sprint_value = f'{duration:.1f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.1f}s' # noqa: PLR2004
|
||||
else:
|
||||
sprint_value = 'N/A'
|
||||
blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A'
|
||||
async with HostPage(
|
||||
await render(
|
||||
'tetrio/info',
|
||||
TETRIOInfo(
|
||||
user=TemplateUser(
|
||||
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
|
||||
if user_info.data.user.avatar_revision is not None
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
|
||||
),
|
||||
name=user_name,
|
||||
bio=user_info.data.user.bio,
|
||||
),
|
||||
ranking=Ranking(
|
||||
rating=round(league.glicko, 2),
|
||||
rd=round(league.rd, 2),
|
||||
),
|
||||
tetra_league=TetraLeague(
|
||||
rank=league.rank,
|
||||
tr=round(league.rating, 2),
|
||||
global_rank=league.standing,
|
||||
pps=league.pps,
|
||||
lpm=round(lpm := (league.pps * 24), 2),
|
||||
apm=league.apm,
|
||||
apl=round(league.apm / lpm, 2),
|
||||
vs=league.vs,
|
||||
adpm=round(adpm := (league.vs * 0.6), 2),
|
||||
adpl=round(adpm / lpm, 2),
|
||||
),
|
||||
tetra_league_history=TetraLeagueHistory(
|
||||
data=histories,
|
||||
split_interval=split_value,
|
||||
min_tr=value_min,
|
||||
max_tr=value_max,
|
||||
offset=offset,
|
||||
),
|
||||
radar=Radar(
|
||||
app=(app := (league.apm / (60 * league.pps))),
|
||||
dsps=(dsps := ((league.vs / 100) - (league.apm / 60))),
|
||||
dspp=(dspp := (dsps / league.pps)),
|
||||
ci=150 * dspp - 125 * app + 50 * (league.vs / league.apm) - 25,
|
||||
ge=2 * ((app * dsps) / league.pps),
|
||||
),
|
||||
sprint=sprint_value,
|
||||
blitz=blitz_value,
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
|
||||
|
||||
|
||||
def make_query_text(user_info: UserInfoSuccess, sprint: SoloModeRecord, blitz: SoloModeRecord) -> UniMessage:
|
||||
league = user_info.data.user.league
|
||||
user_name = user_info.data.user.username.upper()
|
||||
message = ''
|
||||
if isinstance(league, NeverPlayedLeague):
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
else:
|
||||
if isinstance(league, NeverRatedLeague):
|
||||
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
|
||||
else:
|
||||
if league.rank == 'z':
|
||||
message += f'用户 {user_name} 暂无段位, {round(league.rating,2)} TR'
|
||||
else:
|
||||
message += f'{league.rank.upper()} 段用户 {user_name} {round(league.rating,2)} TR (#{league.standing})'
|
||||
message += f', 段位分 {round(league.glicko,2)}±{round(league.rd,2)}, 最近十场的数据:'
|
||||
lpm = league.pps * 24
|
||||
message += f"\nL'PM: {round(lpm, 2)} ( {league.pps} pps )"
|
||||
message += f'\nAPM: {league.apm} ( x{round(league.apm/lpm,2)} )'
|
||||
if league.vs is not None:
|
||||
adpm = league.vs * 0.6
|
||||
message += f'\nADPM: {round(adpm,2)} ( x{round(adpm/lpm,2)} ) ( {league.vs}vs )'
|
||||
if sprint.record is not None:
|
||||
message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s'
|
||||
message += f' ( #{sprint.rank} )' if sprint.rank is not None else ''
|
||||
if blitz.record is not None:
|
||||
message += f'\nBlitz: {blitz.record.endcontext.score}'
|
||||
message += f' ( #{blitz.rank} )' if blitz.rank is not None else ''
|
||||
return UniMessage(message)
|
||||
@@ -1,170 +0,0 @@
|
||||
from asyncio import gather
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import At
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.metrics import TetrisMetricsProWithLPMADPM, get_metrics
|
||||
from ...utils.platform import get_platform
|
||||
from ...utils.typing import Me
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import alc
|
||||
from .api import Player
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
|
||||
def add_special_handlers(
|
||||
teaid_prefix: Literal['onebot-', 'kook-', 'discord-', 'qqguild-'], match_event: type[Event]
|
||||
) -> None:
|
||||
@alc.assign('query')
|
||||
async def _(event: Event, target: At | Me, event_session: EventSession):
|
||||
if isinstance(event, match_event):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
await (
|
||||
await make_query_text(
|
||||
Player(
|
||||
teaid=f'{teaid_prefix}{target.target}'
|
||||
if isinstance(target, At)
|
||||
else f'{teaid_prefix}{event.get_user_id()}',
|
||||
trust=True,
|
||||
)
|
||||
)
|
||||
).finish()
|
||||
|
||||
|
||||
try:
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OB11MessageEvent
|
||||
|
||||
add_special_handlers('onebot-', OB11MessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from nonebot.adapters.qq.event import GuildMessageEvent as QQGuildMessageEvent
|
||||
from nonebot.adapters.qq.event import QQMessageEvent
|
||||
|
||||
add_special_handlers('qqguild-', QQGuildMessageEvent)
|
||||
add_special_handlers('onebot-', QQMessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from nonebot.adapters.kaiheila.event import MessageEvent as KookMessageEvent
|
||||
|
||||
add_special_handlers('kook-', KookMessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from nonebot.adapters.discord import MessageEvent as DiscordMessageEvent
|
||||
|
||||
add_special_handlers('discord-', DiscordMessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
@alc.assign('query')
|
||||
async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
async with get_session() as session:
|
||||
bind = await query_bind_info(
|
||||
session=session,
|
||||
chat_platform=get_platform(bot),
|
||||
chat_account=(target.target if isinstance(target, At) else event.get_user_id()),
|
||||
game_platform=GAME_TYPE,
|
||||
)
|
||||
if bind is None:
|
||||
await matcher.finish('未查询到绑定信息')
|
||||
message = CANT_VERIFY_MESSAGE
|
||||
await (message + await make_query_text(Player(teaid=bind.game_account, trust=True))).finish()
|
||||
|
||||
|
||||
@alc.assign('query')
|
||||
async def _(account: Player, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
await (await make_query_text(account)).finish()
|
||||
|
||||
|
||||
@dataclass
|
||||
class GameData:
|
||||
game_num: int
|
||||
metrics: TetrisMetricsProWithLPMADPM
|
||||
|
||||
|
||||
async def get_game_data(player: Player, query_num: int = 50) -> GameData | None:
|
||||
"""获取游戏数据"""
|
||||
user_profile = await player.get_profile()
|
||||
if user_profile.data == []:
|
||||
return None
|
||||
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = 0.0
|
||||
num = 0
|
||||
for i in user_profile.data:
|
||||
# 排除单人局和时间为0的游戏
|
||||
# 茶: 不计算没挖掘的局, 即使apm和lpm也如此
|
||||
if i.num_players == 1 or i.time == 0 or i.dig is None:
|
||||
continue
|
||||
# 加权计算
|
||||
time = i.time / 1000
|
||||
lpm = 24 * (i.pieces / time)
|
||||
apm = (i.attack / time) * 60
|
||||
adpm = ((i.attack + i.dig) / time) * 60
|
||||
weighted_total_lpm += lpm * time
|
||||
weighted_total_apm += apm * time
|
||||
weighted_total_adpm += adpm * time
|
||||
total_time += time
|
||||
num += 1
|
||||
if num >= query_num:
|
||||
break
|
||||
if num == 0:
|
||||
return None
|
||||
# TODO: 如果有效局数小于 {查询数} , 并且没有无dig信息的局, 且 user_profile.data 内有{请求数}个局, 则继续往前获取信息
|
||||
metrics = get_metrics(
|
||||
lpm=weighted_total_lpm / total_time, apm=weighted_total_apm / total_time, adpm=weighted_total_adpm / total_time
|
||||
)
|
||||
lpm = weighted_total_lpm / total_time
|
||||
apm = weighted_total_apm / total_time
|
||||
adpm = weighted_total_adpm / total_time
|
||||
return GameData(game_num=num, metrics=metrics)
|
||||
|
||||
|
||||
async def make_query_text(player: Player) -> UniMessage:
|
||||
user_info, game_data = await gather(player.get_info(), get_game_data(player))
|
||||
user_data = user_info.data
|
||||
message = f'用户 {user_data.name} ({user_data.teaid}) '
|
||||
if user_data.ranked_games == '0':
|
||||
message += '暂无段位统计数据'
|
||||
else:
|
||||
message += f', 段位分 {round(float(user_data.rating_now),2)}±{round(float(user_data.rd_now),2)} ({round(float(user_data.vol_now),2)}) '
|
||||
if game_data is None:
|
||||
message += ', 暂无游戏数据'
|
||||
else:
|
||||
message += f', 最近 {game_data.game_num} 局数据'
|
||||
message += f"\nL'PM: {game_data.metrics.lpm} ( {game_data.metrics.pps} pps )"
|
||||
message += f'\nAPM: {game_data.metrics.apm} ( x{game_data.metrics.apl} )'
|
||||
message += f'\nADPM: {game_data.metrics.adpm} ( x{game_data.metrics.adpl} ) ( {game_data.metrics.vs}vs )'
|
||||
message += f'\n40L: {float(user_data.pb_sprint)/1000:.2f}s' if user_data.pb_sprint != '2147483647' else ''
|
||||
message += f'\nMarathon: {user_data.pb_marathon}' if user_data.pb_marathon != '0' else ''
|
||||
message += f'\nChallenge: {user_data.pb_challenge}' if user_data.pb_challenge != '0' else ''
|
||||
return UniMessage(message)
|
||||
53
nonebot_plugin_tetris_stats/games/__init__.py
Normal file
53
nonebot_plugin_tetris_stats/games/__init__.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from collections.abc import Callable
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot.typing import T_Handler
|
||||
from nonebot_plugin_alconna import AlcMatches, Alconna, At, CommandMeta, on_alconna
|
||||
|
||||
from .. import ns
|
||||
from ..utils.exception import MessageFormatError, NeedCatchError
|
||||
|
||||
alc = on_alconna(
|
||||
Alconna(
|
||||
['tetris-stats', 'tstats'],
|
||||
namespace=ns,
|
||||
meta=CommandMeta(
|
||||
description='俄罗斯方块相关游戏数据查询',
|
||||
fuzzy_match=True,
|
||||
),
|
||||
),
|
||||
skip_for_unmatch=False,
|
||||
auto_send_output=True,
|
||||
use_origin=True,
|
||||
)
|
||||
|
||||
|
||||
def add_block_handlers(handler: Callable[[T_Handler], T_Handler]) -> None:
|
||||
@handler
|
||||
async def _(bot: Bot, matcher: Matcher, target: At):
|
||||
if isinstance(target, At) and target.target == bot.self_id:
|
||||
await matcher.finish('不能查询bot的信息')
|
||||
|
||||
|
||||
from . import tetrio, top, tos # noqa: F401, E402
|
||||
|
||||
|
||||
@alc.handle()
|
||||
async def _(matcher: Matcher, account: MessageFormatError):
|
||||
await matcher.finish(str(account))
|
||||
|
||||
|
||||
@alc.handle()
|
||||
async def _(matcher: Matcher, matches: AlcMatches):
|
||||
if matches.head_matched and matches.options != {} or matches.main_args == {}:
|
||||
await matcher.finish(
|
||||
(f'{matches.error_info!r}\n' if matches.error_info is not None else '')
|
||||
+ f'输入"{matches.header_result} --help"查看帮助'
|
||||
)
|
||||
|
||||
|
||||
@run_postprocessor
|
||||
async def _(matcher: Matcher, exception: NeedCatchError):
|
||||
await matcher.send(str(exception))
|
||||
79
nonebot_plugin_tetris_stats/games/tetrio/__init__.py
Normal file
79
nonebot_plugin_tetris_stats/games/tetrio/__init__.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from arclet.alconna import Arg, ArgFlag, Args, Option, Subcommand
|
||||
from nonebot_plugin_alconna import At
|
||||
|
||||
from ...utils.exception import MessageFormatError
|
||||
from ...utils.typing import Me
|
||||
from .. import add_block_handlers, alc
|
||||
from .api import Player
|
||||
from .api.typing import ValidRank
|
||||
from .constant import USER_ID, USER_NAME
|
||||
from .typing import Template
|
||||
|
||||
|
||||
def get_player(user_id_or_name: str) -> Player | MessageFormatError:
|
||||
if USER_ID.match(user_id_or_name):
|
||||
return Player(user_id=user_id_or_name, trust=True)
|
||||
if USER_NAME.match(user_id_or_name):
|
||||
return Player(user_name=user_id_or_name, trust=True)
|
||||
return MessageFormatError('用户名/ID不合法')
|
||||
|
||||
|
||||
alc.command.add(
|
||||
Subcommand(
|
||||
'TETR.IO',
|
||||
Subcommand(
|
||||
'bind',
|
||||
Args(
|
||||
Arg(
|
||||
'account',
|
||||
get_player,
|
||||
notice='TETR.IO 用户名 / ID',
|
||||
flags=[ArgFlag.HIDDEN],
|
||||
)
|
||||
),
|
||||
help_text='绑定 TETR.IO 账号',
|
||||
),
|
||||
Subcommand(
|
||||
'query',
|
||||
Args(
|
||||
Arg(
|
||||
'target',
|
||||
At | Me,
|
||||
notice='@想要查询的人 / 自己',
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
Arg(
|
||||
'account',
|
||||
get_player,
|
||||
notice='TETR.IO 用户名 / ID',
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
),
|
||||
Option(
|
||||
'--template',
|
||||
Arg('template', Template),
|
||||
alias=['-T'],
|
||||
help_text='要使用的查询模板',
|
||||
),
|
||||
help_text='查询 TETR.IO 游戏信息',
|
||||
),
|
||||
Subcommand(
|
||||
'rank',
|
||||
Args(Arg('rank', ValidRank, notice='TETR.IO 段位')),
|
||||
help_text='查询 TETR.IO 段位信息',
|
||||
),
|
||||
dest='TETRIO',
|
||||
help_text='TETR.IO 游戏相关指令',
|
||||
)
|
||||
)
|
||||
|
||||
alc.shortcut('(?i:io)(?i:绑|绑定|bind)', {'command': 'tstats TETR.IO bind', 'humanized': 'io绑定'})
|
||||
alc.shortcut('(?i:io)(?i:查|查询|query|stats)', {'command': 'tstats TETR.IO query', 'humanized': 'io查'})
|
||||
|
||||
alc.shortcut(
|
||||
'fkosk', {'command': 'tstats TETR.IO query', 'args': ['我'], 'fuzzy': False, 'humanized': 'An Easter egg!'}
|
||||
)
|
||||
|
||||
add_block_handlers(alc.assign('TETRIO.query'))
|
||||
|
||||
from . import bind, query, rank # noqa: F401, E402
|
||||
@@ -15,3 +15,8 @@ class TETRIOHistoricalData(MappedAsDataclass, Model):
|
||||
api_type: Mapped[Literal['User Info', 'User Records']] = mapped_column(String(16), index=True)
|
||||
data: Mapped[SuccessModel] = mapped_column(PydanticType(get_model=[SuccessModel.__subclasses__], models=set()))
|
||||
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)
|
||||
|
||||
|
||||
class TETRIOUserConfig(MappedAsDataclass, Model):
|
||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||
query_template: Mapped[Literal['v1', 'v2']] = mapped_column(String(2))
|
||||
@@ -43,7 +43,7 @@ class Player:
|
||||
if self.user_id is not None:
|
||||
return self.user_id
|
||||
if self.user_name is not None:
|
||||
return self.user_name
|
||||
return self.user_name.lower()
|
||||
msg = 'Invalid user'
|
||||
raise ValueError(msg)
|
||||
|
||||
@@ -36,7 +36,7 @@ class Garbage(BaseModel):
|
||||
sent: int
|
||||
received: int
|
||||
attack: int | None = None
|
||||
cleared: int
|
||||
cleared: int | None = None
|
||||
|
||||
|
||||
class Finesse(BaseModel):
|
||||
@@ -96,7 +96,7 @@ class MultiRecord(_Record):
|
||||
|
||||
|
||||
class SoloModeRecord(BaseModel):
|
||||
record: SoloRecord
|
||||
record: SoloRecord | None = None
|
||||
rank: int | None = None
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Literal
|
||||
|
||||
Rank = Literal[
|
||||
ValidRank = Literal[
|
||||
'x',
|
||||
'u',
|
||||
'ss',
|
||||
@@ -18,5 +18,6 @@ Rank = Literal[
|
||||
'c-',
|
||||
'd+',
|
||||
'd',
|
||||
'z', # 未定级
|
||||
]
|
||||
|
||||
Rank = ValidRank | Literal['z'] # 未定级
|
||||
@@ -1,17 +1,17 @@
|
||||
from asyncio import gather
|
||||
from hashlib import md5
|
||||
from urllib.parse import urlunparse
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import User # type: ignore[import-untyped]
|
||||
from nonebot_plugin_userinfo import BotUserInfo, UserInfo # type: ignore[import-untyped]
|
||||
|
||||
from ...db import BindStatus, create_or_update_bind, trigger
|
||||
from ...utils.avatar import get_avatar
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.platform import get_platform
|
||||
from ...utils.image import get_avatar
|
||||
from ...utils.render import Bind, render
|
||||
from ...utils.render.schemas.base import Avatar, People
|
||||
from ...utils.screenshot import screenshot
|
||||
@@ -20,33 +20,32 @@ from .api import Player
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
|
||||
@alc.assign('bind')
|
||||
async def _(bot: Bot, event: Event, account: Player, event_session: EventSession, bot_info: UserInfo = BotUserInfo()): # noqa: B008
|
||||
@alc.assign('TETRIO.bind')
|
||||
async def _(nb_user: User, account: Player, event_session: EventSession, bot_info: UserInfo = BotUserInfo()): # noqa: B008
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='bind',
|
||||
command_args=[],
|
||||
):
|
||||
user = await account.user
|
||||
user, user_info = await gather(account.user, account.get_info())
|
||||
async with get_session() as session:
|
||||
bind_status = await create_or_update_bind(
|
||||
session=session,
|
||||
chat_platform=get_platform(bot),
|
||||
chat_account=event.get_user_id(),
|
||||
user=nb_user,
|
||||
game_platform=GAME_TYPE,
|
||||
game_account=user.unique_identifier,
|
||||
)
|
||||
user_info = await account.get_info()
|
||||
if bind_status in (BindStatus.SUCCESS, BindStatus.UPDATE):
|
||||
netloc = get_self_netloc()
|
||||
async with HostPage(
|
||||
await render(
|
||||
'binding',
|
||||
'v1/binding',
|
||||
Bind(
|
||||
platform='TETR.IO',
|
||||
status='unknown',
|
||||
user=People(
|
||||
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
|
||||
if user_info.data.user.avatar_revision is not None
|
||||
else Avatar(type='identicon', hash=md5(user_info.data.user.id.encode()).hexdigest()), # noqa: S324
|
||||
name=user_info.data.user.username.upper(),
|
||||
@@ -59,6 +58,4 @@ async def _(bot: Bot, event: Event, account: Player, event_session: EventSession
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
await UniMessage.image(
|
||||
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
|
||||
).finish()
|
||||
await UniMessage.image(raw=await screenshot(f'http://{netloc}/host/{page_hash}.html')).finish()
|
||||
549
nonebot_plugin_tetris_stats/games/tetrio/query.py
Normal file
549
nonebot_plugin_tetris_stats/games/tetrio/query.py
Normal file
@@ -0,0 +1,549 @@
|
||||
from asyncio import gather
|
||||
from collections import defaultdict
|
||||
from datetime import date, datetime, timedelta, timezone
|
||||
from hashlib import md5
|
||||
from math import ceil, floor
|
||||
from typing import ClassVar, TypeVar, overload
|
||||
from urllib.parse import urlencode
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from aiofiles import open
|
||||
from nonebot import get_driver
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.compat import model_dump, type_validate_json
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import At
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_apscheduler import scheduler # type: ignore[import-untyped]
|
||||
from nonebot_plugin_localstore import get_data_file # type: ignore[import-untyped]
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import get_user # type: ignore[import-untyped]
|
||||
from sqlalchemy import select
|
||||
from zstandard import ZstdDecompressor
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.exception import FallbackError
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.metrics import TetrisMetricsProWithPPSVS, get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.schemas.base import Avatar, Ranking
|
||||
from ...utils.render.schemas.tetrio_info import Data, Radar, TetraLeague, TetraLeagueHistory
|
||||
from ...utils.render.schemas.tetrio_info import Info as V1TemplateInfo
|
||||
from ...utils.render.schemas.tetrio_info import User as V1TemplateUser
|
||||
from ...utils.render.schemas.tetrio_info_v2 import Badge, Blitz, Sprint, Statistic, TetraLeagueStatistic, Zen
|
||||
from ...utils.render.schemas.tetrio_info_v2 import Info as V2TemplateInfo
|
||||
from ...utils.render.schemas.tetrio_info_v2 import TetraLeague as V2TemplateTetraLeague
|
||||
from ...utils.render.schemas.tetrio_info_v2 import User as V2TemplateUser
|
||||
from ...utils.screenshot import screenshot
|
||||
from ...utils.typing import Me, Number
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import alc
|
||||
from .api import Player, User, UserInfoSuccess
|
||||
from .api.models import TETRIOHistoricalData
|
||||
from .api.schemas.tetra_league import TetraLeagueSuccess
|
||||
from .api.schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague
|
||||
from .api.schemas.user_records import SoloModeRecord, UserRecordsSuccess
|
||||
from .constant import GAME_TYPE, TR_MAX, TR_MIN
|
||||
from .model import IORank
|
||||
from .typing import Template
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@alc.assign('TETRIO.query')
|
||||
async def _(
|
||||
event: Event,
|
||||
matcher: Matcher,
|
||||
target: At | Me,
|
||||
event_session: EventSession,
|
||||
template: Template | None = None,
|
||||
):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
async with get_session() as session:
|
||||
bind = await query_bind_info(
|
||||
session=session,
|
||||
user=await get_user(
|
||||
event_session.platform, target.target if isinstance(target, At) else event.get_user_id()
|
||||
),
|
||||
game_platform=GAME_TYPE,
|
||||
)
|
||||
if bind is None:
|
||||
await matcher.finish('未查询到绑定信息')
|
||||
message = UniMessage(CANT_VERIFY_MESSAGE)
|
||||
player = Player(user_id=bind.game_account, trust=True)
|
||||
await (message + (await make_query_result(player, template or 'v1'))).finish()
|
||||
|
||||
|
||||
@alc.assign('TETRIO.query')
|
||||
async def _(account: Player, event_session: EventSession, template: Template | None = None):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
await (await make_query_result(account, template or 'v1')).finish()
|
||||
|
||||
|
||||
def get_value_bounds(values: list[int | float]) -> tuple[int, int]:
|
||||
value_max = 10 * ceil(max(values) / 10)
|
||||
value_min = 10 * floor(min(values) / 10)
|
||||
return value_max, value_min
|
||||
|
||||
|
||||
def get_split(value_max: int, value_min: int) -> tuple[int, int]:
|
||||
offset = 0
|
||||
overflow = 0
|
||||
|
||||
while True:
|
||||
if (new_max_value := value_max + offset + overflow) > TR_MAX:
|
||||
overflow -= 1
|
||||
continue
|
||||
if (new_min_value := value_min - offset + overflow) < TR_MIN:
|
||||
overflow += 1
|
||||
continue
|
||||
if ((new_max_value - new_min_value) / 40).is_integer():
|
||||
split_value = int((value_max + offset - (value_min - offset)) / 4)
|
||||
break
|
||||
offset += 1
|
||||
return split_value, offset + overflow
|
||||
|
||||
|
||||
def get_specified_point(
|
||||
previous_point: Data,
|
||||
behind_point: Data,
|
||||
point_time: datetime,
|
||||
) -> Data:
|
||||
"""根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据
|
||||
|
||||
Args:
|
||||
previous_point (Data): 前面的数据点
|
||||
behind_point (Data): 后面的数据点
|
||||
point_time (datetime): 要推算的点的位置
|
||||
|
||||
Returns:
|
||||
Data: 要推算的点的数据
|
||||
"""
|
||||
# 求两个点的斜率
|
||||
slope = (behind_point.tr - previous_point.tr) / (
|
||||
datetime.timestamp(behind_point.record_at) - datetime.timestamp(previous_point.record_at)
|
||||
)
|
||||
return Data(
|
||||
record_at=point_time,
|
||||
tr=previous_point.tr + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.record_at)),
|
||||
)
|
||||
|
||||
|
||||
async def query_historical_data(user: User, user_info: UserInfoSuccess) -> list[Data]:
|
||||
today = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
forward = timedelta(days=9)
|
||||
start_time = (today - forward).astimezone(UTC)
|
||||
async with get_session() as session:
|
||||
historical_data = (
|
||||
await session.scalars(
|
||||
select(TETRIOHistoricalData)
|
||||
.where(TETRIOHistoricalData.update_time >= start_time)
|
||||
.where(TETRIOHistoricalData.user_unique_identifier == user.unique_identifier)
|
||||
.where(TETRIOHistoricalData.api_type == 'User Info')
|
||||
)
|
||||
).all()
|
||||
if historical_data:
|
||||
extra = (
|
||||
await session.scalars(
|
||||
select(TETRIOHistoricalData)
|
||||
.where(TETRIOHistoricalData.user_unique_identifier == user.unique_identifier)
|
||||
.where(TETRIOHistoricalData.api_type == 'User Info')
|
||||
.order_by(TETRIOHistoricalData.id.desc())
|
||||
.where(TETRIOHistoricalData.id < min([i.id for i in historical_data]))
|
||||
.limit(1)
|
||||
)
|
||||
).one_or_none()
|
||||
if extra is not None:
|
||||
historical_data = list(historical_data)
|
||||
historical_data.append(extra)
|
||||
full_export_data = FullExport.get_data(user.unique_identifier)
|
||||
if not historical_data and not full_export_data:
|
||||
return [
|
||||
Data(record_at=today - forward, tr=user_info.data.user.league.rating),
|
||||
Data(record_at=today.replace(microsecond=1000), tr=user_info.data.user.league.rating),
|
||||
]
|
||||
histories = [
|
||||
Data(
|
||||
record_at=i.update_time.astimezone(ZoneInfo('Asia/Shanghai')),
|
||||
tr=i.data.data.user.league.rating,
|
||||
)
|
||||
for i in historical_data
|
||||
if isinstance(i.data, UserInfoSuccess) and isinstance(i.data.data.user.league, RatedLeague)
|
||||
] + full_export_data
|
||||
|
||||
# 按照时间排序
|
||||
histories = sorted(histories, key=lambda x: x.record_at)
|
||||
for index, value in enumerate(histories):
|
||||
# 在历史记录里找有没有今天0点后的数据, 并且至少要有两个数据点
|
||||
if value.record_at > today and len(histories) >= 2: # noqa: PLR2004
|
||||
histories = histories[:index] + [
|
||||
get_specified_point(histories[index - 1], histories[index], today.replace(microsecond=1000))
|
||||
]
|
||||
break
|
||||
else:
|
||||
histories.append(
|
||||
get_specified_point(
|
||||
histories[-1],
|
||||
Data(record_at=user_info.cache.cached_at, tr=user_info.data.user.league.rating),
|
||||
today.replace(microsecond=1000),
|
||||
)
|
||||
)
|
||||
if histories[0].record_at < (today - forward):
|
||||
histories[0] = get_specified_point(
|
||||
histories[0],
|
||||
histories[1],
|
||||
today - forward,
|
||||
)
|
||||
else:
|
||||
histories.insert(0, Data(record_at=today - forward, tr=histories[0].tr))
|
||||
return histories
|
||||
|
||||
|
||||
L = TypeVar('L', NeverPlayedLeague, NeverRatedLeague, RatedLeague)
|
||||
|
||||
|
||||
@overload
|
||||
def get_league(user_info: UserInfoSuccess, league_type: type[L]) -> L: ...
|
||||
@overload
|
||||
def get_league(
|
||||
user_info: UserInfoSuccess, league_type: None = None
|
||||
) -> NeverPlayedLeague | NeverRatedLeague | RatedLeague: ...
|
||||
def get_league(
|
||||
user_info: UserInfoSuccess, league_type: type[L] | None = None
|
||||
) -> L | NeverPlayedLeague | NeverRatedLeague | RatedLeague:
|
||||
league = user_info.data.user.league
|
||||
if league_type is None:
|
||||
return league
|
||||
if isinstance(league, league_type):
|
||||
return league
|
||||
raise FallbackError
|
||||
|
||||
|
||||
def get_sprint(user_records: UserRecordsSuccess) -> SoloModeRecord:
|
||||
return user_records.data.records.sprint
|
||||
|
||||
|
||||
def get_blitz(user_records: UserRecordsSuccess) -> SoloModeRecord:
|
||||
return user_records.data.records.blitz
|
||||
|
||||
|
||||
async def make_query_image_v1(player: Player) -> bytes:
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
league = get_league(user_info, RatedLeague)
|
||||
sprint, blitz = get_sprint(user_records).record, get_blitz(user_records).record
|
||||
if league.vs is None:
|
||||
raise FallbackError
|
||||
histories = await query_historical_data(user, user_info)
|
||||
value_max, value_min = get_value_bounds([i.tr for i in histories])
|
||||
split_value, offset = get_split(value_max, value_min)
|
||||
if sprint is not None:
|
||||
duration = timedelta(milliseconds=sprint.endcontext.final_time).total_seconds()
|
||||
sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004
|
||||
else:
|
||||
sprint_value = 'N/A'
|
||||
blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A'
|
||||
netloc = get_self_netloc()
|
||||
async with HostPage(
|
||||
page=await render(
|
||||
'v1/tetrio/info',
|
||||
V1TemplateInfo(
|
||||
user=V1TemplateUser(
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
|
||||
if user_info.data.user.avatar_revision is not None
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
|
||||
),
|
||||
name=user.name.upper(),
|
||||
bio=user_info.data.user.bio,
|
||||
),
|
||||
ranking=Ranking(
|
||||
rating=round(league.glicko, 2),
|
||||
rd=round(league.rd, 2),
|
||||
),
|
||||
tetra_league=TetraLeague(
|
||||
rank=league.rank,
|
||||
tr=round(league.rating, 2),
|
||||
global_rank=league.standing,
|
||||
pps=league.pps,
|
||||
lpm=round(lpm := (league.pps * 24), 2),
|
||||
apm=league.apm,
|
||||
apl=round(league.apm / lpm, 2),
|
||||
vs=league.vs,
|
||||
adpm=round(adpm := (league.vs * 0.6), 2),
|
||||
adpl=round(adpm / lpm, 2),
|
||||
),
|
||||
tetra_league_history=TetraLeagueHistory(
|
||||
data=histories,
|
||||
split_interval=split_value,
|
||||
min_tr=value_min,
|
||||
max_tr=value_max,
|
||||
offset=offset,
|
||||
),
|
||||
radar=Radar(
|
||||
app=(app := (league.apm / (60 * league.pps))),
|
||||
dsps=(dsps := ((league.vs / 100) - (league.apm / 60))),
|
||||
dspp=(dspp := (dsps / league.pps)),
|
||||
ci=150 * dspp - 125 * app + 50 * (league.vs / league.apm) - 25,
|
||||
ge=2 * ((app * dsps) / league.pps),
|
||||
),
|
||||
sprint=sprint_value,
|
||||
blitz=blitz_value,
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
return await screenshot(f'http://{netloc}/host/{page_hash}.html')
|
||||
|
||||
|
||||
N = TypeVar('N', int, float)
|
||||
|
||||
|
||||
def handling_special_value(value: N) -> N | None:
|
||||
return value if value != -1 else None
|
||||
|
||||
|
||||
async def make_query_image_v2(player: Player) -> bytes:
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
league = get_league(user_info)
|
||||
sprint, blitz = get_sprint(user_records), get_blitz(user_records)
|
||||
|
||||
if sprint.record is not None:
|
||||
duration = timedelta(milliseconds=sprint.record.endcontext.final_time).total_seconds()
|
||||
sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004
|
||||
else:
|
||||
sprint_value = 'N/A'
|
||||
|
||||
play_time: str | None
|
||||
if (game_time := handling_special_value(user_info.data.user.gametime)) is not None:
|
||||
if game_time // 3600 > 0:
|
||||
play_time = f'{game_time//3600:.0f}h {game_time % 3600 // 60:.0f}m {game_time % 60:.0f}s'
|
||||
elif game_time // 60 > 0:
|
||||
play_time = f'{game_time//60:.0f}m {game_time % 60:.0f}s'
|
||||
else:
|
||||
play_time = f'{game_time:.0f}s'
|
||||
else:
|
||||
play_time = game_time
|
||||
netloc = get_self_netloc()
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v2/tetrio/info',
|
||||
V2TemplateInfo(
|
||||
user=V2TemplateUser(
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
bio=user_info.data.user.bio,
|
||||
banner=f'http://{netloc}/host/resource/tetrio/banners/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
|
||||
if user_info.data.user.banner_revision is not None and user_info.data.user.banner_revision != 0
|
||||
else None,
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": user_info.data.user.avatar_revision})}'
|
||||
if user_info.data.user.avatar_revision is not None
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
hash=md5(user_info.data.user.id.encode()).hexdigest(), # noqa: S324
|
||||
),
|
||||
badges=[
|
||||
Badge(
|
||||
id=i.id,
|
||||
description=i.label,
|
||||
group=i.group,
|
||||
receive_at=i.ts if isinstance(i.ts, datetime) else None,
|
||||
)
|
||||
for i in user_info.data.user.badges
|
||||
],
|
||||
country=user_info.data.user.country,
|
||||
xp=user_info.data.user.xp,
|
||||
friend_count=user_info.data.user.friend_count or 0,
|
||||
supporter_tier=user_info.data.user.supporter_tier,
|
||||
bad_standing=user_info.data.user.badstanding or False,
|
||||
verified=user_info.data.user.verified,
|
||||
playtime=play_time,
|
||||
join_at=user_info.data.user.ts,
|
||||
),
|
||||
tetra_league=V2TemplateTetraLeague(
|
||||
rank=league.rank,
|
||||
highest_rank=league.bestrank,
|
||||
tr=round(league.rating, 2),
|
||||
glicko=round(league.glicko, 2),
|
||||
rd=round(league.rd, 2),
|
||||
global_rank=handling_special_value(league.standing),
|
||||
country_rank=handling_special_value(league.standing_local),
|
||||
pps=(
|
||||
metrics := get_metrics(pps=league.pps, apm=league.apm, vs=league.vs)
|
||||
if league.vs is not None
|
||||
else get_metrics(pps=league.pps, apm=league.apm)
|
||||
).pps,
|
||||
apm=metrics.apm,
|
||||
apl=metrics.apl,
|
||||
vs=metrics.vs if isinstance(metrics, TetrisMetricsProWithPPSVS) else None,
|
||||
adpl=metrics.adpl if isinstance(metrics, TetrisMetricsProWithPPSVS) else None,
|
||||
statistic=TetraLeagueStatistic(
|
||||
total=league.gamesplayed,
|
||||
wins=league.gameswon,
|
||||
),
|
||||
)
|
||||
if isinstance(league, RatedLeague)
|
||||
else None,
|
||||
statistic=Statistic(
|
||||
total=handling_special_value(user_info.data.user.gamesplayed),
|
||||
wins=handling_special_value(user_info.data.user.gameswon),
|
||||
),
|
||||
sprint=Sprint(
|
||||
time=sprint_value,
|
||||
global_rank=sprint.rank,
|
||||
play_at=sprint.record.ts,
|
||||
)
|
||||
if sprint.record is not None
|
||||
else None,
|
||||
blitz=Blitz(
|
||||
score=blitz.record.endcontext.score,
|
||||
global_rank=blitz.rank,
|
||||
play_at=blitz.record.ts,
|
||||
)
|
||||
if blitz.record is not None
|
||||
else None,
|
||||
zen=Zen.model_validate(model_dump(user_records.data.zen)),
|
||||
),
|
||||
),
|
||||
) as page_hash:
|
||||
return await screenshot(f'http://{netloc}/host/{page_hash}.html')
|
||||
|
||||
|
||||
async def make_query_text(player: Player) -> UniMessage:
|
||||
user, user_info, user_records = await gather(player.user, player.get_info(), player.get_records())
|
||||
league = get_league(user_info)
|
||||
sprint, blitz = get_sprint(user_records), get_blitz(user_records)
|
||||
|
||||
user_name = user.name.upper()
|
||||
|
||||
message = ''
|
||||
if isinstance(league, NeverPlayedLeague):
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
else:
|
||||
if isinstance(league, NeverRatedLeague):
|
||||
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
|
||||
else:
|
||||
if league.rank == 'z':
|
||||
message += f'用户 {user_name} 暂无段位, {round(league.rating,2)} TR'
|
||||
else:
|
||||
message += f'{league.rank.upper()} 段用户 {user_name} {round(league.rating,2)} TR (#{league.standing})'
|
||||
message += f', 段位分 {round(league.glicko,2)}±{round(league.rd,2)}, 最近十场的数据:'
|
||||
metrics = (
|
||||
get_metrics(pps=league.pps, apm=league.apm, vs=league.vs)
|
||||
if league.vs is not None
|
||||
else get_metrics(pps=league.pps, apm=league.apm)
|
||||
)
|
||||
message += f"\nL'PM: {metrics.lpm} ( {metrics.pps} pps )"
|
||||
message += f'\nAPM: {metrics.apm} ( x{metrics.apl} )'
|
||||
if isinstance(metrics, TetrisMetricsProWithPPSVS):
|
||||
message += f'\nADPM: {metrics.adpm} ( x{metrics.adpl} ) ( {metrics.vs}vs )'
|
||||
if sprint.record is not None:
|
||||
message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s'
|
||||
message += f' ( #{sprint.rank} )' if sprint.rank is not None else ''
|
||||
if blitz.record is not None:
|
||||
message += f'\nBlitz: {blitz.record.endcontext.score}'
|
||||
message += f' ( #{blitz.rank} )' if blitz.rank is not None else ''
|
||||
return UniMessage(message)
|
||||
|
||||
|
||||
async def make_query_result(player: Player, template: Template) -> UniMessage:
|
||||
try:
|
||||
if template == 'v1':
|
||||
return UniMessage.image(raw=await make_query_image_v1(player))
|
||||
if template == 'v2':
|
||||
return UniMessage.image(raw=await make_query_image_v2(player))
|
||||
except FallbackError:
|
||||
...
|
||||
return await make_query_text(player)
|
||||
|
||||
|
||||
class FullExport:
|
||||
cache: ClassVar[defaultdict[str, set[tuple[datetime, Number]]]] = defaultdict(set)
|
||||
latest_update: ClassVar[date | None] = None
|
||||
|
||||
@classmethod
|
||||
async def init(cls) -> None:
|
||||
async with get_session() as session:
|
||||
full_exports = (await session.scalars(select(IORank).where(IORank.update_time >= cls.start_time()))).all()
|
||||
await gather(
|
||||
*[
|
||||
cls._load(update_time, file_hash)
|
||||
for file_hash, update_time in {
|
||||
i.file_hash: i.update_time for i in full_exports if i.file_hash is not None
|
||||
}.items()
|
||||
]
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def update(cls) -> None:
|
||||
if cls.latest_update == datetime.now(tz=ZoneInfo('Asia/Shanghai')).date():
|
||||
return
|
||||
start_time = cls.start_time()
|
||||
for i in cls.cache:
|
||||
cls.cache[i] = {j for j in cls.cache[i] if j[0] >= start_time}
|
||||
latest_time = max(cls.cache)
|
||||
async with get_session() as session:
|
||||
full_exports = (await session.scalars(select(IORank).where(IORank.update_time > latest_time))).all()
|
||||
await gather(
|
||||
*[
|
||||
cls._load(update_time, file_hash)
|
||||
for file_hash, update_time in {
|
||||
i.file_hash: i.update_time for i in full_exports if i.file_hash is not None
|
||||
}.items()
|
||||
]
|
||||
)
|
||||
cls.latest_update = datetime.now(tz=ZoneInfo('Asia/Shanghai')).date()
|
||||
|
||||
@classmethod
|
||||
def get_data(cls, unique_identifier: str) -> list[Data]:
|
||||
return [Data(record_at=i[0], tr=i[1]) for i in cls.cache[unique_identifier]]
|
||||
|
||||
@classmethod
|
||||
def start_time(cls) -> datetime:
|
||||
return (
|
||||
datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
- timedelta(days=9)
|
||||
).astimezone(UTC)
|
||||
|
||||
@classmethod
|
||||
async def _load(cls, update_time: datetime, file_hash: str) -> None:
|
||||
try:
|
||||
users = type_validate_json(TetraLeagueSuccess, await cls.decompress(file_hash)).data.users
|
||||
except FileNotFoundError:
|
||||
await cls.clear_invalid(file_hash)
|
||||
return
|
||||
update_time = update_time.astimezone(ZoneInfo('Asia/Shanghai'))
|
||||
for i in users:
|
||||
cls.cache[i.id].add((update_time, i.league.rating))
|
||||
|
||||
@classmethod
|
||||
async def decompress(cls, file_hash: str) -> bytes:
|
||||
async with open(get_data_file('nonebot_plugin_tetris_stats', f'{file_hash}.json.zst'), mode='rb') as file:
|
||||
return ZstdDecompressor().decompress(await file.read())
|
||||
|
||||
@classmethod
|
||||
async def clear_invalid(cls, file_hash: str) -> None:
|
||||
async with get_session() as session:
|
||||
full_exports = (await session.scalars(select(IORank).where(IORank.file_hash == file_hash))).all()
|
||||
for i in full_exports:
|
||||
i.file_hash = None
|
||||
await session.commit()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await FullExport.init()
|
||||
scheduler.add_job(FullExport.update, 'interval', hours=1)
|
||||
@@ -37,7 +37,7 @@ UTC = timezone.utc
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@alc.assign('rank')
|
||||
@alc.assign('TETRIO.rank')
|
||||
async def _(matcher: Matcher, rank: Rank, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
3
nonebot_plugin_tetris_stats/games/tetrio/typing.py
Normal file
3
nonebot_plugin_tetris_stats/games/tetrio/typing.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from typing import Literal
|
||||
|
||||
Template = Literal['v1', 'v2']
|
||||
@@ -1,10 +1,9 @@
|
||||
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
|
||||
from nonebot_plugin_alconna import At, on_alconna
|
||||
from arclet.alconna import Arg, ArgFlag, Args, Subcommand
|
||||
from nonebot_plugin_alconna import At
|
||||
|
||||
from ...utils.exception import MessageFormatError
|
||||
from ...utils.typing import Me
|
||||
from .. import add_default_handlers
|
||||
from ..constant import BIND_COMMAND, QUERY_COMMAND
|
||||
from .. import add_block_handlers, alc
|
||||
from .api import Player
|
||||
from .constant import USER_NAME
|
||||
|
||||
@@ -15,31 +14,28 @@ def get_player(name: str) -> Player | MessageFormatError:
|
||||
return MessageFormatError('用户名/ID不合法')
|
||||
|
||||
|
||||
alc = on_alconna(
|
||||
Alconna(
|
||||
'top',
|
||||
Option(
|
||||
BIND_COMMAND[0],
|
||||
alc.command.add(
|
||||
Subcommand(
|
||||
'TOP',
|
||||
Subcommand(
|
||||
'bind',
|
||||
Args(
|
||||
Arg(
|
||||
'account',
|
||||
get_player,
|
||||
notice='TOP 用户名',
|
||||
notice='TOP 用户名 / ID',
|
||||
flags=[ArgFlag.HIDDEN],
|
||||
)
|
||||
),
|
||||
alias=BIND_COMMAND[1:],
|
||||
compact=True,
|
||||
dest='bind',
|
||||
help_text='绑定 TOP 账号',
|
||||
),
|
||||
Option(
|
||||
QUERY_COMMAND[0],
|
||||
Subcommand(
|
||||
'query',
|
||||
Args(
|
||||
Arg(
|
||||
'target',
|
||||
At | Me,
|
||||
notice='@想要查询的人 | 自己',
|
||||
notice='@想要查询的人 / 自己',
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
Arg(
|
||||
@@ -49,24 +45,15 @@ alc = on_alconna(
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
),
|
||||
alias=QUERY_COMMAND[1:],
|
||||
compact=True,
|
||||
dest='query',
|
||||
help_text='查询 TOP 游戏信息',
|
||||
),
|
||||
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
|
||||
meta=CommandMeta(
|
||||
description='查询 TetrisOnline波兰服 的信息',
|
||||
example='top绑定scdhh\ntop查我',
|
||||
compact=True,
|
||||
fuzzy_match=True,
|
||||
),
|
||||
),
|
||||
skip_for_unmatch=False,
|
||||
auto_send_output=True,
|
||||
aliases={'TOP'},
|
||||
help_text='TOP 游戏相关指令',
|
||||
)
|
||||
)
|
||||
|
||||
from . import bind, query # noqa: E402, F401
|
||||
alc.shortcut('(?i:top)(?i:绑|绑定|bind)', {'command': 'tstats TOP bind', 'humanized': 'top绑定'})
|
||||
alc.shortcut('(?i:top)(?i:查|查询|query|stats)', {'command': 'tstats TOP query', 'humanized': 'top查'})
|
||||
|
||||
add_default_handlers(alc)
|
||||
add_block_handlers(alc.assign('TOP.query'))
|
||||
|
||||
from . import bind, query # noqa: E402, F401
|
||||
@@ -1,16 +1,13 @@
|
||||
from urllib.parse import urlunparse
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import User # type: ignore[import-untyped]
|
||||
from nonebot_plugin_userinfo import BotUserInfo, EventUserInfo, UserInfo # type: ignore[import-untyped]
|
||||
|
||||
from ...db import BindStatus, create_or_update_bind, trigger
|
||||
from ...utils.avatar import get_avatar
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.platform import get_platform
|
||||
from ...utils.image import get_avatar
|
||||
from ...utils.render import Bind, render
|
||||
from ...utils.render.schemas.base import People
|
||||
from ...utils.screenshot import screenshot
|
||||
@@ -19,13 +16,13 @@ from .api import Player
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
|
||||
@alc.assign('bind')
|
||||
@alc.assign('TOP.bind')
|
||||
async def _(
|
||||
bot: Bot,
|
||||
nb_user: User,
|
||||
account: Player,
|
||||
event_session: EventSession,
|
||||
bot_info: UserInfo = BotUserInfo(), # noqa: B008
|
||||
event_user_info: UserInfo = EventUserInfo(), # noqa: B008
|
||||
bot_info: UserInfo = BotUserInfo(), # noqa: B008
|
||||
):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
@@ -37,15 +34,14 @@ async def _(
|
||||
async with get_session() as session:
|
||||
bind_status = await create_or_update_bind(
|
||||
session=session,
|
||||
chat_platform=get_platform(bot),
|
||||
chat_account=event_user_info.user_id,
|
||||
user=nb_user,
|
||||
game_platform=GAME_TYPE,
|
||||
game_account=user.unique_identifier,
|
||||
)
|
||||
if bind_status in (BindStatus.SUCCESS, BindStatus.UPDATE):
|
||||
async with HostPage(
|
||||
await render(
|
||||
'binding',
|
||||
'v1/binding',
|
||||
Bind(
|
||||
platform=GAME_TYPE,
|
||||
status='unknown',
|
||||
@@ -62,5 +58,5 @@ async def _(
|
||||
)
|
||||
) as page_hash:
|
||||
await UniMessage.image(
|
||||
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
|
||||
raw=await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||
).finish()
|
||||
@@ -1,14 +1,14 @@
|
||||
from nonebot.adapters import Bot, Event
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import At
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import get_user # type: ignore[import-untyped]
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.metrics import get_metrics
|
||||
from ...utils.platform import get_platform
|
||||
from ...utils.typing import Me
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import alc
|
||||
@@ -17,8 +17,8 @@ from .api.schemas.user_profile import UserProfile
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
|
||||
@alc.assign('query')
|
||||
async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
|
||||
@alc.assign('TOP.query')
|
||||
async def _(event: Event, matcher: Matcher, target: At | Me, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
@@ -28,8 +28,9 @@ async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_ses
|
||||
async with get_session() as session:
|
||||
bind = await query_bind_info(
|
||||
session=session,
|
||||
chat_platform=get_platform(bot),
|
||||
chat_account=(target.target if isinstance(target, At) else event.get_user_id()),
|
||||
user=await get_user(
|
||||
event_session.platform, target.target if isinstance(target, At) else event.get_user_id()
|
||||
),
|
||||
game_platform=GAME_TYPE,
|
||||
)
|
||||
if bind is None:
|
||||
@@ -38,7 +39,7 @@ async def _(bot: Bot, event: Event, matcher: Matcher, target: At | Me, event_ses
|
||||
await (message + make_query_text(await Player(user_name=bind.game_account, trust=True).get_profile())).finish()
|
||||
|
||||
|
||||
@alc.assign('query')
|
||||
@alc.assign('TOP.query')
|
||||
async def _(account: Player, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
@@ -1,10 +1,9 @@
|
||||
from arclet.alconna import Alconna, AllParam, Arg, ArgFlag, Args, CommandMeta, Option
|
||||
from nonebot_plugin_alconna import At, on_alconna
|
||||
from arclet.alconna import Arg, ArgFlag, Args, Subcommand
|
||||
from nonebot_plugin_alconna import At
|
||||
|
||||
from ...utils.exception import MessageFormatError
|
||||
from ...utils.typing import Me
|
||||
from .. import add_default_handlers
|
||||
from ..constant import BIND_COMMAND, QUERY_COMMAND
|
||||
from .. import add_block_handlers, alc
|
||||
from .api import Player
|
||||
from .constant import USER_NAME
|
||||
|
||||
@@ -20,31 +19,28 @@ def get_player(teaid_or_name: str) -> Player | MessageFormatError:
|
||||
return MessageFormatError('用户名/ID不合法')
|
||||
|
||||
|
||||
alc = on_alconna(
|
||||
Alconna(
|
||||
'茶服',
|
||||
Option(
|
||||
BIND_COMMAND[0],
|
||||
alc.command.add(
|
||||
Subcommand(
|
||||
'TOS',
|
||||
Subcommand(
|
||||
'bind',
|
||||
Args(
|
||||
Arg(
|
||||
'account',
|
||||
get_player,
|
||||
notice='茶服 用户名 / TeaID',
|
||||
notice='茶服 用户名 / ID',
|
||||
flags=[ArgFlag.HIDDEN],
|
||||
)
|
||||
),
|
||||
alias=BIND_COMMAND[1:],
|
||||
compact=True,
|
||||
dest='bind',
|
||||
help_text='绑定 茶服 账号',
|
||||
),
|
||||
Option(
|
||||
QUERY_COMMAND[0],
|
||||
Subcommand(
|
||||
'query',
|
||||
Args(
|
||||
Arg(
|
||||
'target',
|
||||
At | Me,
|
||||
notice='@想要查询的人 | 自己',
|
||||
notice='@想要查询的人 / 自己',
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
Arg(
|
||||
@@ -53,27 +49,16 @@ alc = on_alconna(
|
||||
notice='茶服 用户名 / TeaID',
|
||||
flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL],
|
||||
),
|
||||
# 如果放在一个 Union Args 里, 验证顺序不能保证, 可能出错
|
||||
),
|
||||
alias=QUERY_COMMAND[1:],
|
||||
compact=True,
|
||||
dest='query',
|
||||
help_text='查询 茶服 游戏信息',
|
||||
),
|
||||
Arg('other', AllParam, flags=[ArgFlag.HIDDEN, ArgFlag.OPTIONAL]),
|
||||
meta=CommandMeta(
|
||||
description='查询 TetrisOnline茶服 的信息',
|
||||
example='茶服查我',
|
||||
compact=True,
|
||||
fuzzy_match=True,
|
||||
),
|
||||
),
|
||||
skip_for_unmatch=False,
|
||||
auto_send_output=True,
|
||||
aliases={'tos', 'TOS'},
|
||||
help_text='茶服 游戏相关指令',
|
||||
)
|
||||
)
|
||||
|
||||
alc.shortcut('(?i:tos|茶服)(?i:绑|绑定|bind)', {'command': 'tstats TOS bind', 'humanized': '茶服绑定'})
|
||||
alc.shortcut('(?i:tos|茶服)(?i:查|查询|query|stats)', {'command': 'tstats TOS query', 'humanized': '茶服查'})
|
||||
|
||||
add_block_handlers(alc.assign('TOS.query'))
|
||||
|
||||
from . import bind, query # noqa: E402, F401
|
||||
|
||||
add_default_handlers(alc)
|
||||
@@ -1,16 +1,13 @@
|
||||
from urllib.parse import urlunparse
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import User # type: ignore[import-untyped]
|
||||
from nonebot_plugin_userinfo import BotUserInfo, EventUserInfo, UserInfo # type: ignore[import-untyped]
|
||||
|
||||
from ...db import BindStatus, create_or_update_bind, trigger
|
||||
from ...utils.avatar import get_avatar
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.platform import get_platform
|
||||
from ...utils.image import get_avatar
|
||||
from ...utils.render import Bind, render
|
||||
from ...utils.render.schemas.base import People
|
||||
from ...utils.screenshot import screenshot
|
||||
@@ -19,13 +16,13 @@ from .api import Player
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
|
||||
@alc.assign('bind')
|
||||
@alc.assign('TOS.bind')
|
||||
async def _(
|
||||
bot: Bot,
|
||||
nb_user: User,
|
||||
account: Player,
|
||||
event_session: EventSession,
|
||||
bot_info: UserInfo = BotUserInfo(), # noqa: B008
|
||||
event_user_info: UserInfo = EventUserInfo(), # noqa: B008
|
||||
bot_info: UserInfo = BotUserInfo(), # noqa: B008
|
||||
):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
@@ -37,8 +34,7 @@ async def _(
|
||||
async with get_session() as session:
|
||||
bind_status = await create_or_update_bind(
|
||||
session=session,
|
||||
chat_platform=get_platform(bot),
|
||||
chat_account=event_user_info.user_id,
|
||||
user=nb_user,
|
||||
game_platform=GAME_TYPE,
|
||||
game_account=user.unique_identifier,
|
||||
)
|
||||
@@ -46,7 +42,7 @@ async def _(
|
||||
if bind_status in (BindStatus.SUCCESS, BindStatus.UPDATE):
|
||||
async with HostPage(
|
||||
await render(
|
||||
'binding',
|
||||
'v1/binding',
|
||||
Bind(
|
||||
platform=GAME_TYPE,
|
||||
status='unknown',
|
||||
@@ -62,5 +58,5 @@ async def _(
|
||||
)
|
||||
) as page_hash:
|
||||
await UniMessage.image(
|
||||
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
|
||||
raw=await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||
).finish()
|
||||
252
nonebot_plugin_tetris_stats/games/tos/query.py
Normal file
252
nonebot_plugin_tetris_stats/games/tos/query.py
Normal file
@@ -0,0 +1,252 @@
|
||||
from asyncio import gather
|
||||
from datetime import timedelta
|
||||
from http import HTTPStatus
|
||||
from typing import Literal, NamedTuple
|
||||
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot_plugin_alconna import At
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import get_user # type: ignore[import-untyped]
|
||||
from nonebot_plugin_userinfo import EventUserInfo, UserInfo # type: ignore[import-untyped]
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.exception import RequestError
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.image import get_avatar
|
||||
from ...utils.metrics import TetrisMetricsProWithLPMADPM, get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.schemas.base import People, Ranking
|
||||
from ...utils.render.schemas.tos_info import Info, Multiplayer, Radar
|
||||
from ...utils.screenshot import screenshot
|
||||
from ...utils.typing import Me, Number
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import alc
|
||||
from .api import Player
|
||||
from .api.schemas.user_info import UserInfoSuccess
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
|
||||
def add_special_handlers(
|
||||
teaid_prefix: Literal['onebot-', 'kook-', 'discord-', 'qqguild-'], match_event: type[Event]
|
||||
) -> None:
|
||||
@alc.assign('TOS.query')
|
||||
async def _(
|
||||
event: Event,
|
||||
target: At | Me,
|
||||
event_session: EventSession,
|
||||
event_user_info: UserInfo = EventUserInfo(), # noqa: B008
|
||||
):
|
||||
if isinstance(event, match_event):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
player = Player(
|
||||
teaid=f'{teaid_prefix}{target.target}'
|
||||
if isinstance(target, At)
|
||||
else f'{teaid_prefix}{event.get_user_id()}',
|
||||
trust=True,
|
||||
)
|
||||
try:
|
||||
user_info, game_data = await gather(player.get_info(), get_game_data(player))
|
||||
if game_data is not None:
|
||||
await UniMessage.image(
|
||||
raw=await make_query_image(user_info, game_data, event_user_info)
|
||||
).finish()
|
||||
await make_query_text(user_info, game_data).finish()
|
||||
except RequestError as e:
|
||||
if e.status_code == HTTPStatus.BAD_REQUEST and '未找到此用户' in e.message:
|
||||
return
|
||||
raise
|
||||
|
||||
|
||||
try:
|
||||
from nonebot.adapters.onebot.v11 import MessageEvent as OB11MessageEvent
|
||||
|
||||
add_special_handlers('onebot-', OB11MessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from nonebot.adapters.qq.event import GuildMessageEvent as QQGuildMessageEvent
|
||||
from nonebot.adapters.qq.event import QQMessageEvent
|
||||
|
||||
add_special_handlers('qqguild-', QQGuildMessageEvent)
|
||||
add_special_handlers('onebot-', QQMessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from nonebot.adapters.kaiheila.event import MessageEvent as KookMessageEvent
|
||||
|
||||
add_special_handlers('kook-', KookMessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
try:
|
||||
from nonebot.adapters.discord import MessageEvent as DiscordMessageEvent
|
||||
|
||||
add_special_handlers('discord-', DiscordMessageEvent)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
@alc.assign('TOS.query')
|
||||
async def _(
|
||||
event: Event,
|
||||
matcher: Matcher,
|
||||
target: At | Me,
|
||||
event_session: EventSession,
|
||||
event_user_info: UserInfo = EventUserInfo(), # noqa: B008
|
||||
):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
async with get_session() as session:
|
||||
bind = await query_bind_info(
|
||||
session=session,
|
||||
user=await get_user(
|
||||
event_session.platform, target.target if isinstance(target, At) else event.get_user_id()
|
||||
),
|
||||
game_platform=GAME_TYPE,
|
||||
)
|
||||
if bind is None:
|
||||
await matcher.finish('未查询到绑定信息')
|
||||
message = CANT_VERIFY_MESSAGE
|
||||
player = Player(teaid=bind.game_account, trust=True)
|
||||
user_info, game_data = await gather(player.get_info(), get_game_data(player))
|
||||
if game_data is not None:
|
||||
await (
|
||||
message + UniMessage.image(raw=await make_query_image(user_info, game_data, event_user_info))
|
||||
).finish()
|
||||
await (message + make_query_text(user_info, game_data)).finish()
|
||||
|
||||
|
||||
@alc.assign('TOS.query')
|
||||
async def _(account: Player, event_session: EventSession, event_user_info: UserInfo = EventUserInfo()): # noqa: B008
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
user_info, game_data = await gather(account.get_info(), get_game_data(account))
|
||||
if game_data is not None:
|
||||
await UniMessage.image(raw=await make_query_image(user_info, game_data, event_user_info)).finish()
|
||||
await make_query_text(user_info, game_data).finish()
|
||||
|
||||
|
||||
class GameData(NamedTuple):
|
||||
game_num: int
|
||||
metrics: TetrisMetricsProWithLPMADPM
|
||||
OR: Number
|
||||
dspp: Number
|
||||
ge: Number
|
||||
|
||||
|
||||
async def get_game_data(player: Player, query_num: int = 50) -> GameData | None:
|
||||
"""获取游戏数据"""
|
||||
user_profile = await player.get_profile()
|
||||
if user_profile.data == []:
|
||||
return None
|
||||
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = 0.0
|
||||
total_attack = total_dig = total_offset = total_pieses = total_receive = num = 0
|
||||
for i in user_profile.data:
|
||||
# 排除单人局和时间为0的游戏
|
||||
# 茶: 不计算没挖掘的局, 即使apm和lpm也如此
|
||||
if i.num_players == 1 or i.time == 0 or i.dig is None:
|
||||
continue
|
||||
# 加权计算
|
||||
time = i.time / 1000
|
||||
lpm = 24 * (i.pieces / time)
|
||||
apm = (i.attack / time) * 60
|
||||
adpm = ((i.attack + i.dig) / time) * 60
|
||||
weighted_total_lpm += lpm * time
|
||||
weighted_total_apm += apm * time
|
||||
weighted_total_adpm += adpm * time
|
||||
total_attack += i.attack
|
||||
total_dig += i.dig
|
||||
total_offset += i.offset
|
||||
total_pieses += i.pieces
|
||||
total_receive += i.receive
|
||||
total_time += time
|
||||
num += 1
|
||||
if num >= query_num:
|
||||
break
|
||||
if num == 0:
|
||||
return None
|
||||
# TODO: 如果有效局数小于 {查询数} , 并且没有无dig信息的局, 且 user_profile.data 内有{请求数}个局, 则继续往前获取信息
|
||||
metrics = get_metrics(
|
||||
lpm=weighted_total_lpm / total_time, apm=weighted_total_apm / total_time, adpm=weighted_total_adpm / total_time
|
||||
)
|
||||
return GameData(
|
||||
game_num=num,
|
||||
metrics=metrics,
|
||||
OR=total_offset / total_receive * 100,
|
||||
dspp=total_dig / total_pieses,
|
||||
ge=2 * ((total_attack * total_dig) / total_pieses**2),
|
||||
)
|
||||
|
||||
|
||||
async def make_query_image(user_info: UserInfoSuccess, game_data: GameData, event_user_info: UserInfo) -> bytes:
|
||||
metrics = game_data.metrics
|
||||
duration = timedelta(milliseconds=float(user_info.data.pb_sprint)).total_seconds()
|
||||
sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v1/tos/info',
|
||||
Info(
|
||||
user=People(avatar=await get_avatar(event_user_info, 'Data URI', None), name=user_info.data.name),
|
||||
ranking=Ranking(rating=float(user_info.data.ranking), rd=round(float(user_info.data.rd_now), 2)),
|
||||
multiplayer=Multiplayer(
|
||||
pps=metrics.pps,
|
||||
lpm=metrics.lpm,
|
||||
apm=metrics.apm,
|
||||
apl=metrics.apl,
|
||||
vs=metrics.vs,
|
||||
adpm=metrics.adpm,
|
||||
adpl=metrics.adpl,
|
||||
),
|
||||
radar=Radar(
|
||||
app=(app := (metrics.apm / (60 * metrics.pps))),
|
||||
OR=game_data.OR,
|
||||
dspp=game_data.dspp,
|
||||
ci=150 * game_data.dspp - 125 * app + 50 * (metrics.vs / metrics.apm) - 25,
|
||||
ge=game_data.ge,
|
||||
),
|
||||
sprint=sprint_value,
|
||||
challenge=f'{int(user_info.data.pb_challenge):,}' if user_info.data.pb_challenge != '0' else 'N/A',
|
||||
marathon=f'{int(user_info.data.pb_marathon):,}' if user_info.data.pb_marathon != '0' else 'N/A',
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||
|
||||
|
||||
def make_query_text(user_info: UserInfoSuccess, game_data: GameData | None) -> UniMessage:
|
||||
user_data = user_info.data
|
||||
message = f'用户 {user_data.name} ({user_data.teaid}) '
|
||||
if user_data.ranked_games == '0':
|
||||
message += '暂无段位统计数据'
|
||||
else:
|
||||
message += f', 段位分 {round(float(user_data.rating_now),2)}±{round(float(user_data.rd_now),2)} ({round(float(user_data.vol_now),2)}) '
|
||||
if game_data is None:
|
||||
message += ', 暂无游戏数据'
|
||||
else:
|
||||
message += f', 最近 {game_data.game_num} 局数据'
|
||||
message += f"\nL'PM: {game_data.metrics.lpm} ( {game_data.metrics.pps} pps )"
|
||||
message += f'\nAPM: {game_data.metrics.apm} ( x{game_data.metrics.apl} )'
|
||||
message += f'\nADPM: {game_data.metrics.adpm} ( x{game_data.metrics.adpl} ) ( {game_data.metrics.vs}vs )'
|
||||
message += f'\n40L: {float(user_data.pb_sprint)/1000:.2f}s' if user_data.pb_sprint != '2147483647' else ''
|
||||
message += f'\nMarathon: {user_data.pb_marathon}' if user_data.pb_marathon != '0' else ''
|
||||
message += f'\nChallenge: {user_data.pb_challenge}' if user_data.pb_challenge != '0' else ''
|
||||
return UniMessage(message)
|
||||
@@ -27,6 +27,10 @@ class MessageFormatError(NeedCatchError):
|
||||
"""用户发送的消息格式不正确"""
|
||||
|
||||
|
||||
class FallbackError(NeedCatchError):
|
||||
"""需要回滚至更通用的方法"""
|
||||
|
||||
|
||||
class DoNotCatchError(TetrisStatsError):
|
||||
"""不应该被捕获的异常基类"""
|
||||
|
||||
@@ -35,5 +39,5 @@ class WhatTheFuckError(DoNotCatchError):
|
||||
"""用于表示不应该出现的情况 ("""
|
||||
|
||||
|
||||
class HandleNotFinishedError(DoNotCatchError):
|
||||
"""任务没有正常完成处理的错误"""
|
||||
class NoFallbackError(DoNotCatchError): # 暂时没用 但是先写了
|
||||
"""没有可用的回退方法"""
|
||||
|
||||
@@ -1,26 +1,27 @@
|
||||
from hashlib import sha256
|
||||
from ipaddress import IPv4Address, IPv6Address
|
||||
from typing import TYPE_CHECKING, ClassVar
|
||||
from typing import TYPE_CHECKING, ClassVar, Literal
|
||||
|
||||
from fastapi import FastAPI, status
|
||||
from fastapi.responses import HTMLResponse
|
||||
from fastapi import FastAPI, Path, status
|
||||
from fastapi.responses import FileResponse, HTMLResponse, Response
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from nonebot import get_app, get_driver
|
||||
from nonebot.log import logger
|
||||
from nonebot_plugin_localstore import get_cache_dir # type: ignore[import-untyped]
|
||||
|
||||
from ..config.config import CACHE_PATH
|
||||
from .image import img_to_png
|
||||
from .request import Request
|
||||
from .templates import templates_dir
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pydantic import IPvAnyAddress
|
||||
|
||||
app = get_app()
|
||||
app: FastAPI = get_app()
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
global_config = driver.config
|
||||
|
||||
cache_dir = get_cache_dir('nonebot_plugin_tetris_stats')
|
||||
|
||||
if not isinstance(app, FastAPI):
|
||||
msg = '本插件需要 FastAPI 驱动器才能运行'
|
||||
@@ -60,6 +61,22 @@ async def _(page_hash: str) -> HTMLResponse:
|
||||
return NOT_FOUND
|
||||
|
||||
|
||||
@app.get('/host/resource/tetrio/{resource_type}/{user_id}', status_code=status.HTTP_200_OK)
|
||||
async def _(
|
||||
resource_type: Literal['avatars', 'banners'], revision: int, user_id: str = Path(regex=r'^[a-f0-9]{24}$')
|
||||
) -> Response:
|
||||
if not (path := CACHE_PATH / 'tetrio' / resource_type / f'{user_id}_{revision}.png').exists():
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(
|
||||
img_to_png(
|
||||
await Request.request(
|
||||
f'https://tetr.io/user-content/{resource_type}/{user_id}.jpg?rv={revision}', is_json=False
|
||||
)
|
||||
)
|
||||
)
|
||||
return FileResponse(path)
|
||||
|
||||
|
||||
def get_self_netloc() -> str:
|
||||
host: IPv4Address | IPv6Address | IPvAnyAddress = global_config.host
|
||||
if isinstance(host, IPv4Address):
|
||||
|
||||
@@ -52,3 +52,11 @@ async def get_avatar(user: UserInfo, scheme: Literal['Data URI', 'bytes'], defau
|
||||
raise TypeError(msg)
|
||||
return f'data:{Image.MIME[avatar_format]};base64,{b64encode(bot_avatar).decode()}'
|
||||
return bot_avatar
|
||||
|
||||
|
||||
def img_to_png(image: bytes) -> bytes:
|
||||
"""将图片转换为 PNG 格式"""
|
||||
result = BytesIO()
|
||||
with Image.open(BytesIO(image)) as img:
|
||||
img.save(result, 'PNG')
|
||||
return result.getvalue()
|
||||
@@ -1,19 +0,0 @@
|
||||
from nonebot.adapters import Bot
|
||||
|
||||
|
||||
def get_platform(bot: Bot) -> str:
|
||||
try:
|
||||
from nonebot.adapters.onebot.v12 import Bot as OB12Bot
|
||||
|
||||
if isinstance(bot, OB12Bot):
|
||||
return bot.platform
|
||||
except ImportError:
|
||||
pass
|
||||
try:
|
||||
from nonebot.adapters.satori import Bot as SaBot
|
||||
|
||||
if isinstance(bot, SaBot):
|
||||
return bot.platform
|
||||
except ImportError:
|
||||
pass
|
||||
return bot.type
|
||||
@@ -5,7 +5,10 @@ from nonebot.compat import PYDANTIC_V2
|
||||
|
||||
from ..templates import templates_dir
|
||||
from .schemas.bind import Bind
|
||||
from .schemas.tetrio_info import TETRIOInfo
|
||||
from .schemas.tetrio_info import Info as TETRIOInfo
|
||||
from .schemas.tetrio_info_v2 import Info as TETRIOInfoV2
|
||||
from .schemas.top_info import Info as TOPInfo
|
||||
from .schemas.tos_info import Info as TOSInfo
|
||||
|
||||
env = Environment(
|
||||
loader=FileSystemLoader(templates_dir), autoescape=True, trim_blocks=True, lstrip_blocks=True, enable_async=True
|
||||
@@ -13,17 +16,34 @@ env = Environment(
|
||||
|
||||
|
||||
@overload
|
||||
async def render(render_type: Literal['binding'], data: Bind) -> str: ...
|
||||
async def render(render_type: Literal['v1/binding'], data: Bind) -> str: ...
|
||||
|
||||
|
||||
@overload
|
||||
async def render(render_type: Literal['tetrio/info'], data: TETRIOInfo) -> str: ...
|
||||
async def render(render_type: Literal['v1/tetrio/info'], data: TETRIOInfo) -> str: ...
|
||||
|
||||
|
||||
async def render(render_type: Literal['binding', 'tetrio/info'], data: Bind | TETRIOInfo) -> str:
|
||||
@overload
|
||||
async def render(render_type: Literal['v2/tetrio/info'], data: TETRIOInfoV2) -> str: ...
|
||||
|
||||
|
||||
@overload
|
||||
async def render(render_type: Literal['v1/top/info'], data: TOPInfo) -> str: ...
|
||||
|
||||
|
||||
@overload
|
||||
async def render(render_type: Literal['v1/tos/info'], data: TOSInfo) -> str: ...
|
||||
|
||||
|
||||
async def render(
|
||||
render_type: Literal['v1/binding', 'v1/tetrio/info', 'v2/tetrio/info', 'v1/top/info', 'v1/tos/info'],
|
||||
data: Bind | TETRIOInfo | TETRIOInfoV2 | TOPInfo | TOSInfo,
|
||||
) -> str:
|
||||
if PYDANTIC_V2:
|
||||
return await env.get_template('index.html').render_async(path=render_type, data=data.model_dump_json())
|
||||
return await env.get_template('index.html').render_async(path=render_type, data=data.json())
|
||||
return await env.get_template('index.html').render_async(
|
||||
path=render_type, data=data.model_dump_json(by_alias=True)
|
||||
)
|
||||
return await env.get_template('index.html').render_async(path=render_type, data=data.json(by_alias=True))
|
||||
|
||||
|
||||
__all__ = ['render', 'Bind', 'TETRIOInfo']
|
||||
__all__ = ['render', 'Bind']
|
||||
|
||||
@@ -2,6 +2,8 @@ from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ...typing import Number
|
||||
|
||||
|
||||
class Avatar(BaseModel):
|
||||
type: Literal['identicon']
|
||||
@@ -11,3 +13,8 @@ class Avatar(BaseModel):
|
||||
class People(BaseModel):
|
||||
avatar: str | Avatar
|
||||
name: str
|
||||
|
||||
|
||||
class Ranking(BaseModel):
|
||||
rating: Number
|
||||
rd: Number
|
||||
|
||||
@@ -4,9 +4,9 @@ from typing import Annotated, ClassVar
|
||||
from nonebot.compat import PYDANTIC_V2
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ....game_data_processor.io_data_processor.api.typing import Rank
|
||||
from ....games.tetrio.api.typing import Rank
|
||||
from ...typing import Number
|
||||
from .base import People
|
||||
from .base import People, Ranking
|
||||
|
||||
if PYDANTIC_V2:
|
||||
from pydantic import PlainSerializer
|
||||
@@ -20,11 +20,6 @@ class User(People):
|
||||
bio: str | None
|
||||
|
||||
|
||||
class Ranking(BaseModel):
|
||||
rating: Number
|
||||
rd: Number
|
||||
|
||||
|
||||
class TetraLeague(BaseModel):
|
||||
rank: Rank
|
||||
tr: Number
|
||||
@@ -62,7 +57,7 @@ class Radar(BaseModel):
|
||||
ge: Number
|
||||
|
||||
|
||||
class TETRIOInfo(BaseModel):
|
||||
class Info(BaseModel):
|
||||
user: User
|
||||
ranking: Ranking
|
||||
tetra_league: TetraLeague
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ....games.tetrio.api.typing import Rank
|
||||
from ...typing import Number
|
||||
from .base import Avatar
|
||||
|
||||
|
||||
class Badge(BaseModel):
|
||||
id: str
|
||||
description: str
|
||||
group: str | None
|
||||
receive_at: datetime | None
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
id: str
|
||||
name: str
|
||||
country: str | None
|
||||
|
||||
avatar: str | Avatar
|
||||
banner: str | None
|
||||
|
||||
bio: str | None
|
||||
|
||||
friend_count: int
|
||||
supporter_tier: int
|
||||
|
||||
verified: bool
|
||||
bad_standing: bool
|
||||
|
||||
badges: list[Badge]
|
||||
xp: Number
|
||||
|
||||
playtime: str | None
|
||||
join_at: datetime | None
|
||||
|
||||
|
||||
class Statistic(BaseModel):
|
||||
total: int | None
|
||||
wins: int | None
|
||||
|
||||
|
||||
class TetraLeagueStatistic(BaseModel):
|
||||
total: int
|
||||
wins: int
|
||||
|
||||
|
||||
class TetraLeague(BaseModel):
|
||||
rank: Rank
|
||||
highest_rank: Rank
|
||||
|
||||
tr: Number
|
||||
|
||||
glicko: Number
|
||||
rd: Number
|
||||
|
||||
global_rank: int | None
|
||||
country_rank: int | None
|
||||
|
||||
pps: Number
|
||||
|
||||
apm: Number
|
||||
apl: Number
|
||||
|
||||
vs: Number | None
|
||||
adpl: Number | None
|
||||
|
||||
statistic: TetraLeagueStatistic
|
||||
|
||||
|
||||
class Sprint(BaseModel):
|
||||
time: str
|
||||
global_rank: int | None
|
||||
play_at: datetime
|
||||
|
||||
|
||||
class Blitz(BaseModel):
|
||||
score: int
|
||||
global_rank: int | None
|
||||
play_at: datetime
|
||||
|
||||
|
||||
class Zen(BaseModel):
|
||||
score: int
|
||||
level: int
|
||||
|
||||
|
||||
class Info(BaseModel):
|
||||
user: User
|
||||
tetra_league: TetraLeague | None
|
||||
statistic: Statistic | None
|
||||
sprint: Sprint | None
|
||||
blitz: Blitz | None
|
||||
zen: Zen
|
||||
17
nonebot_plugin_tetris_stats/utils/render/schemas/top_info.py
Normal file
17
nonebot_plugin_tetris_stats/utils/render/schemas/top_info.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ...typing import Number
|
||||
from .base import People
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
pps: Number
|
||||
lpm: Number
|
||||
apm: Number
|
||||
apl: Number
|
||||
|
||||
|
||||
class Info(BaseModel):
|
||||
user: People
|
||||
today: Data
|
||||
history: Data
|
||||
32
nonebot_plugin_tetris_stats/utils/render/schemas/tos_info.py
Normal file
32
nonebot_plugin_tetris_stats/utils/render/schemas/tos_info.py
Normal file
@@ -0,0 +1,32 @@
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...typing import Number
|
||||
from .base import People, Ranking
|
||||
|
||||
|
||||
class Multiplayer(BaseModel):
|
||||
pps: Number
|
||||
lpm: Number
|
||||
apm: Number
|
||||
apl: Number
|
||||
vs: Number
|
||||
adpm: Number
|
||||
adpl: Number
|
||||
|
||||
|
||||
class Radar(BaseModel):
|
||||
app: Number
|
||||
OR: Number = Field(serialization_alias='or')
|
||||
dspp: Number
|
||||
ci: Number
|
||||
ge: Number
|
||||
|
||||
|
||||
class Info(BaseModel):
|
||||
user: People
|
||||
ranking: Ranking
|
||||
multiplayer: Multiplayer
|
||||
radar: Radar
|
||||
sprint: str
|
||||
challenge: str
|
||||
marathon: str
|
||||
@@ -1,38 +1,44 @@
|
||||
from asyncio import sleep
|
||||
from collections.abc import Awaitable, Callable
|
||||
from collections.abc import Callable, Coroutine
|
||||
from contextlib import suppress
|
||||
from datetime import timedelta
|
||||
from functools import wraps
|
||||
from typing import TypeVar, cast
|
||||
from typing import Any, ParamSpec, TypeVar
|
||||
|
||||
from nonebot.log import logger
|
||||
from nonebot_plugin_alconna.uniseg import SerializeFailed, UniMessage
|
||||
|
||||
T = TypeVar('T')
|
||||
P = ParamSpec('P')
|
||||
|
||||
|
||||
def retry(
|
||||
max_attempts: int = 3,
|
||||
exception_type: type[BaseException] | tuple[type[BaseException], ...] = Exception,
|
||||
delay: timedelta | None = None,
|
||||
) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
|
||||
def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
|
||||
reply: str | UniMessage | None = None,
|
||||
) -> Callable[[Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]]]:
|
||||
def decorator(func: Callable[P, Coroutine[Any, Any, T]]) -> Callable[P, Coroutine[Any, Any, T]]:
|
||||
@wraps(func)
|
||||
async def wrapper(*args, **kwargs) -> T: # noqa: ANN002, ANN003
|
||||
attempts = 0
|
||||
while attempts < max_attempts + 1:
|
||||
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
|
||||
for i in range(max_attempts + 1):
|
||||
if i > 0:
|
||||
message = f'Retrying: {func.__name__} ({i}/{max_attempts})'
|
||||
logger.debug(message)
|
||||
with suppress(SerializeFailed):
|
||||
await UniMessage(reply or message).send()
|
||||
if i == max_attempts:
|
||||
break
|
||||
try:
|
||||
return await func(*args, **kwargs)
|
||||
except exception_type as e: # noqa: PERF203
|
||||
except exception_type as e:
|
||||
if i == max_attempts:
|
||||
raise
|
||||
logger.exception(e)
|
||||
attempts += 1
|
||||
if attempts <= max_attempts:
|
||||
if delay is not None:
|
||||
await sleep(delay.total_seconds())
|
||||
logger.debug(f'Retrying: {func.__name__} ({attempts}/{max_attempts})')
|
||||
continue
|
||||
raise
|
||||
msg = 'Unexpectedly reached the end of the retry loop'
|
||||
raise RuntimeError(msg)
|
||||
if delay is not None:
|
||||
await sleep(delay.total_seconds())
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return cast(Callable[..., Awaitable[T]], wrapper)
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
from playwright.async_api import TimeoutError
|
||||
|
||||
from .browser import BrowserManager
|
||||
from .retry import retry
|
||||
|
||||
|
||||
@retry(exception_type=TimeoutError, reply='截图失败, 重试中')
|
||||
async def screenshot(url: str) -> bytes:
|
||||
browser = await BrowserManager.get_browser()
|
||||
async with (
|
||||
await browser.new_page(no_viewport=True, viewport={'width': 0, 'height': 0}) as page,
|
||||
await browser.new_page(viewport={'width': 3000, 'height': 3000}) as page,
|
||||
):
|
||||
await page.goto(url)
|
||||
await page.wait_for_load_state('networkidle')
|
||||
return await page.screenshot(full_page=True, type='png')
|
||||
return await page.locator('id=content').screenshot(timeout=5000, type='png')
|
||||
|
||||
1380
poetry.lock
generated
1380
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = 'nonebot-plugin-tetris-stats'
|
||||
version = '1.2.1'
|
||||
version = '1.2.12'
|
||||
description = '一款基于 NoneBot2 的用于查询 Tetris 相关游戏数据的插件'
|
||||
authors = ['scdhh <wallfjjd@gmail.com>']
|
||||
readme = 'README.md'
|
||||
@@ -17,6 +17,7 @@ nonebot-plugin-localstore = "^0.6.0"
|
||||
nonebot-plugin-orm = ">=0.1.1,<0.8.0"
|
||||
nonebot-plugin-session = "^0.3.1"
|
||||
nonebot-plugin-session-orm = "^0.2.0"
|
||||
nonebot-plugin-user = "^0.2.0"
|
||||
nonebot-plugin-userinfo = "^0.2.4"
|
||||
aiocache = "^0.12.2"
|
||||
aiofiles = "^23.2.1"
|
||||
@@ -38,14 +39,16 @@ types-lxml = "^2024.2.9"
|
||||
types-pillow = "^10.2.0.20240423"
|
||||
types-ujson = '^5.9.0'
|
||||
pandas-stubs = '>=1.5.2,<3.0.0'
|
||||
nonebot-plugin-orm = { extras = ["default"], version = ">=0.3,<0.8" }
|
||||
nonebot2 = { extras = ["all"], version = "^2.3.0" }
|
||||
nonebot-adapter-discord = "^0.1.3"
|
||||
nonebot-adapter-kaiheila = "^0.3.4"
|
||||
nonebot-adapter-onebot = "^2.4.1"
|
||||
nonebot-adapter-qq = "^1.4.4"
|
||||
nonebot-adapter-satori = "^0.11.4"
|
||||
nonebot-plugin-orm = { extras = ["default"], version = ">=0.3,<0.8" }
|
||||
|
||||
[tool.poetry.group.debug.dependencies]
|
||||
memory-profiler = "^0.61.0"
|
||||
objprint = '^0.2.2'
|
||||
viztracer = "^0.16.2"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user