mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
✨ TETR.IO rank 命令使用图片回复
This commit is contained in:
@@ -92,7 +92,15 @@ command.add(
|
|||||||
),
|
),
|
||||||
Subcommand(
|
Subcommand(
|
||||||
'rank',
|
'rank',
|
||||||
Args(Arg('rank', ValidRank, notice='TETR.IO 段位')),
|
Option(
|
||||||
|
'--all',
|
||||||
|
dest='all',
|
||||||
|
),
|
||||||
|
Option(
|
||||||
|
'--detail',
|
||||||
|
Arg('rank', ValidRank),
|
||||||
|
alias=['-D'],
|
||||||
|
),
|
||||||
help_text='查询 TETR.IO 段位信息',
|
help_text='查询 TETR.IO 段位信息',
|
||||||
),
|
),
|
||||||
Subcommand(
|
Subcommand(
|
||||||
@@ -108,57 +116,54 @@ command.add(
|
|||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def rank_wrapper(slot: int | str, content: str | None):
|
||||||
|
if slot == 'rank' and not content:
|
||||||
|
return '--all'
|
||||||
|
if content is not None:
|
||||||
|
return f'--detail {content.lower()}'
|
||||||
|
return content
|
||||||
|
|
||||||
|
|
||||||
alc.shortcut(
|
alc.shortcut(
|
||||||
'(?i:io)(?i:绑定|绑|bind)',
|
'(?i:io)(?i:绑定|绑|bind)',
|
||||||
{
|
command='tstats TETR.IO bind',
|
||||||
'command': 'tstats TETR.IO bind',
|
humanized='io绑定',
|
||||||
'humanized': 'io绑定',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
alc.shortcut(
|
alc.shortcut(
|
||||||
'(?i:io)(?i:查询|查|query|stats)',
|
'(?i:io)(?i:查询|查|query|stats)',
|
||||||
{
|
command='tstats TETR.IO query',
|
||||||
'command': 'tstats TETR.IO query',
|
humanized='io查',
|
||||||
'humanized': 'io查',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
alc.shortcut(
|
alc.shortcut(
|
||||||
'(?i:io)(?i:记录|record)(?i:40l)',
|
'(?i:io)(?i:记录|record)(?i:40l)',
|
||||||
{
|
command='tstats TETR.IO record --40l',
|
||||||
'command': 'tstats TETR.IO record --40l',
|
humanized='io记录40l',
|
||||||
'humanized': 'io记录40l',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
alc.shortcut(
|
alc.shortcut(
|
||||||
'(?i:io)(?i:记录|record)(?i:blitz)',
|
'(?i:io)(?i:记录|record)(?i:blitz)',
|
||||||
{
|
command='tstats TETR.IO record --blitz',
|
||||||
'command': 'tstats TETR.IO record --blitz',
|
humanized='io记录blitz',
|
||||||
'humanized': 'io记录blitz',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
alc.shortcut(
|
alc.shortcut(
|
||||||
'(?i:io)(?i:段位|段|rank)',
|
r'(?i:io)(?i:段位|段|rank)\s*(?P<rank>[a-zA-Z+-]{0,2})',
|
||||||
{
|
command='tstats TETR.IO rank {rank}',
|
||||||
'command': 'tstats TETR.IO rank',
|
humanized='iorank',
|
||||||
'humanized': 'iorank',
|
fuzzy=False,
|
||||||
},
|
wrapper=rank_wrapper,
|
||||||
)
|
)
|
||||||
alc.shortcut(
|
alc.shortcut(
|
||||||
'(?i:io)(?i:配置|配|config)',
|
'(?i:io)(?i:配置|配|config)',
|
||||||
{
|
command='tstats TETR.IO config',
|
||||||
'command': 'tstats TETR.IO config',
|
humanized='io配置',
|
||||||
'humanized': 'io配置',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
alc.shortcut(
|
alc.shortcut(
|
||||||
'fkosk',
|
'fkosk',
|
||||||
{
|
command='tstats TETR.IO query',
|
||||||
'command': 'tstats TETR.IO query',
|
arguments=['我'],
|
||||||
'args': ['我'],
|
fuzzy=False,
|
||||||
'fuzzy': False,
|
humanized='An Easter egg!',
|
||||||
'humanized': 'An Easter egg!',
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
|
|
||||||
add_block_handlers(alc.assign('TETRIO.query'))
|
add_block_handlers(alc.assign('TETRIO.query'))
|
||||||
|
|||||||
@@ -1,13 +1,13 @@
|
|||||||
from re import compile
|
from re import compile
|
||||||
from typing import Literal
|
from typing import Literal
|
||||||
|
|
||||||
from .api.typing import Rank
|
from .api.typing import ValidRank
|
||||||
|
|
||||||
GAME_TYPE: Literal['IO'] = 'IO'
|
GAME_TYPE: Literal['IO'] = 'IO'
|
||||||
|
|
||||||
BASE_URL = 'https://ch.tetr.io/api/'
|
BASE_URL = 'https://ch.tetr.io/api/'
|
||||||
|
|
||||||
RANK_PERCENTILE: dict[Rank, float] = {
|
RANK_PERCENTILE: dict[ValidRank, float] = {
|
||||||
'x': 1,
|
'x': 1,
|
||||||
'u': 5,
|
'u': 5,
|
||||||
'ss': 11,
|
'ss': 11,
|
||||||
|
|||||||
@@ -4,13 +4,13 @@ from nonebot_plugin_orm import Model
|
|||||||
from sqlalchemy import JSON, DateTime, String
|
from sqlalchemy import JSON, DateTime, String
|
||||||
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
|
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
|
||||||
|
|
||||||
from .api.typing import Rank
|
from .api.typing import ValidRank
|
||||||
from .typing import Template
|
from .typing import Template
|
||||||
|
|
||||||
|
|
||||||
class IORank(MappedAsDataclass, Model):
|
class IORank(MappedAsDataclass, Model):
|
||||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||||
rank: Mapped[Rank] = mapped_column(String(2), index=True)
|
rank: Mapped[ValidRank] = mapped_column(String(2), index=True)
|
||||||
tr_line: Mapped[float]
|
tr_line: Mapped[float]
|
||||||
player_count: Mapped[int]
|
player_count: Mapped[int]
|
||||||
low_pps: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
|
low_pps: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
|
||||||
|
|||||||
@@ -1,174 +0,0 @@
|
|||||||
from collections import defaultdict
|
|
||||||
from collections.abc import Callable
|
|
||||||
from datetime import datetime, timedelta, timezone
|
|
||||||
from hashlib import sha512
|
|
||||||
from math import floor
|
|
||||||
from statistics import mean
|
|
||||||
from zoneinfo import ZoneInfo
|
|
||||||
|
|
||||||
from aiofiles import open
|
|
||||||
from nonebot import get_driver
|
|
||||||
from nonebot.compat import model_dump
|
|
||||||
from nonebot.matcher import Matcher
|
|
||||||
from nonebot.utils import run_sync
|
|
||||||
from nonebot_plugin_apscheduler import scheduler # type: ignore[import-untyped]
|
|
||||||
from nonebot_plugin_localstore import get_data_file # type: ignore[import-untyped]
|
|
||||||
from nonebot_plugin_orm import get_session
|
|
||||||
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
|
||||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
|
||||||
from sqlalchemy import func, select
|
|
||||||
from zstandard import ZstdCompressor
|
|
||||||
|
|
||||||
from ...db import trigger
|
|
||||||
from ...utils.exception import RequestError
|
|
||||||
from ...utils.metrics import get_metrics
|
|
||||||
from ...utils.retry import retry
|
|
||||||
from . import alc
|
|
||||||
from .api.schemas.base import FailedModel
|
|
||||||
from .api.schemas.tetra_league import ValidUser
|
|
||||||
from .api.schemas.user import User
|
|
||||||
from .api.tetra_league import full_export
|
|
||||||
from .api.typing import Rank
|
|
||||||
from .constant import GAME_TYPE, RANK_PERCENTILE
|
|
||||||
from .models import IORank
|
|
||||||
|
|
||||||
UTC = timezone.utc
|
|
||||||
|
|
||||||
driver = get_driver()
|
|
||||||
|
|
||||||
|
|
||||||
@alc.assign('TETRIO.rank')
|
|
||||||
async def _(matcher: Matcher, rank: Rank, event_session: EventSession):
|
|
||||||
async with trigger(
|
|
||||||
session_persist_id=await get_session_persist_id(event_session),
|
|
||||||
game_platform=GAME_TYPE,
|
|
||||||
command_type='rank',
|
|
||||||
command_args=[],
|
|
||||||
):
|
|
||||||
if rank == 'z':
|
|
||||||
await matcher.finish('暂不支持查询未知段位')
|
|
||||||
async with get_session() as session:
|
|
||||||
latest_data = (
|
|
||||||
await session.scalars(select(IORank).where(IORank.rank == rank).order_by(IORank.id.desc()).limit(1))
|
|
||||||
).one()
|
|
||||||
compare_data = (
|
|
||||||
await session.scalars(
|
|
||||||
select(IORank)
|
|
||||||
.where(IORank.rank == rank)
|
|
||||||
.order_by(
|
|
||||||
func.abs(
|
|
||||||
func.julianday(IORank.update_time)
|
|
||||||
- func.julianday(latest_data.update_time - timedelta(hours=24))
|
|
||||||
)
|
|
||||||
)
|
|
||||||
.limit(1)
|
|
||||||
)
|
|
||||||
).one()
|
|
||||||
message = ''
|
|
||||||
if (datetime.now(UTC) - latest_data.update_time.replace(tzinfo=UTC)) > timedelta(hours=7):
|
|
||||||
message += 'Warning: 数据超过7小时未更新, 请联系Bot主人查看后台\n'
|
|
||||||
message += f'{rank.upper()} 段 分数线 {latest_data.tr_line:.2f} TR, {latest_data.player_count} 名玩家\n'
|
|
||||||
if compare_data.id != latest_data.id:
|
|
||||||
message += f'对比 {(latest_data.update_time-compare_data.update_time).total_seconds()/3600:.2f} 小时前趋势: {f"↑{difference:.2f}" if (difference:=latest_data.tr_line-compare_data.tr_line) > 0 else f"↓{-difference:.2f}" if difference < 0 else "→"}'
|
|
||||||
else:
|
|
||||||
message += '暂无对比数据'
|
|
||||||
avg = get_metrics(pps=latest_data.avg_pps, apm=latest_data.avg_apm, vs=latest_data.avg_vs)
|
|
||||||
low_pps = get_metrics(pps=latest_data.low_pps[1])
|
|
||||||
low_vs = get_metrics(vs=latest_data.low_vs[1])
|
|
||||||
max_pps = get_metrics(pps=latest_data.high_pps[1])
|
|
||||||
max_vs = get_metrics(vs=latest_data.high_vs[1])
|
|
||||||
message += (
|
|
||||||
'\n'
|
|
||||||
'平均数据:\n'
|
|
||||||
f"L'PM: {avg.lpm} ( {avg.pps} pps )\n"
|
|
||||||
f'APM: {avg.apm} ( x{avg.apl} )\n'
|
|
||||||
f'ADPM: {avg.adpm} ( x{avg.adpl} ) ( {avg.vs}vs )\n'
|
|
||||||
'\n'
|
|
||||||
'最低数据:\n'
|
|
||||||
f"L'PM: {low_pps.lpm} ( {low_pps.pps} pps ) By: {latest_data.low_pps[0]['name'].upper()}\n"
|
|
||||||
f'APM: {latest_data.low_apm[1]} By: {latest_data.low_apm[0]["name"].upper()}\n'
|
|
||||||
f'ADPM: {low_vs.adpm} ( {low_vs.vs}vs ) By: {latest_data.low_vs[0]["name"].upper()}\n'
|
|
||||||
'\n'
|
|
||||||
'最高数据:\n'
|
|
||||||
f"L'PM: {max_pps.lpm} ( {max_pps.pps} pps ) By: {latest_data.high_pps[0]['name'].upper()}\n"
|
|
||||||
f'APM: {latest_data.high_apm[1]} By: {latest_data.high_apm[0]["name"].upper()}\n'
|
|
||||||
f'ADPM: {max_vs.adpm} ( {max_vs.vs}vs ) By: {latest_data.high_vs[0]["name"].upper()}\n'
|
|
||||||
'\n'
|
|
||||||
f'数据更新时间: {latest_data.update_time.replace(tzinfo=UTC).astimezone(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")}'
|
|
||||||
)
|
|
||||||
await matcher.finish(message)
|
|
||||||
|
|
||||||
|
|
||||||
@scheduler.scheduled_job('cron', hour='0,6,12,18', minute=0)
|
|
||||||
@retry(exception_type=RequestError, delay=timedelta(minutes=15))
|
|
||||||
async def get_tetra_league_data() -> None:
|
|
||||||
league, original = await full_export(with_original=True)
|
|
||||||
if isinstance(league, FailedModel):
|
|
||||||
msg = f'排行榜数据请求错误:\n{league.error}'
|
|
||||||
raise RequestError(msg)
|
|
||||||
|
|
||||||
def pps(user: ValidUser) -> float:
|
|
||||||
return user.league.pps
|
|
||||||
|
|
||||||
def apm(user: ValidUser) -> float:
|
|
||||||
return user.league.apm
|
|
||||||
|
|
||||||
def vs(user: ValidUser) -> float:
|
|
||||||
return user.league.vs
|
|
||||||
|
|
||||||
def _min(users: list[ValidUser], field: Callable[[ValidUser], float]) -> ValidUser:
|
|
||||||
return min(users, key=field)
|
|
||||||
|
|
||||||
def _max(users: list[ValidUser], field: Callable[[ValidUser], float]) -> ValidUser:
|
|
||||||
return max(users, key=field)
|
|
||||||
|
|
||||||
def build_extremes_data(
|
|
||||||
users: list[ValidUser],
|
|
||||||
field: Callable[[ValidUser], float],
|
|
||||||
sort: Callable[[list[ValidUser], Callable[[ValidUser], float]], ValidUser],
|
|
||||||
) -> tuple[dict[str, str], float]:
|
|
||||||
user = sort(users, field)
|
|
||||||
return model_dump(User(ID=user.id, name=user.username)), field(user)
|
|
||||||
|
|
||||||
data_hash: str | None = await run_sync((await run_sync(sha512)(original)).hexdigest)()
|
|
||||||
async with open(get_data_file('nonebot_plugin_tetris_stats', f'{data_hash}.json.zst'), mode='wb') as file:
|
|
||||||
await file.write(await run_sync(ZstdCompressor(level=12, threads=-1).compress)(original))
|
|
||||||
|
|
||||||
users = [i for i in league.data.users if isinstance(i, ValidUser)]
|
|
||||||
rank_to_users: defaultdict[Rank, list[ValidUser]] = defaultdict(list)
|
|
||||||
for i in users:
|
|
||||||
rank_to_users[i.league.rank].append(i)
|
|
||||||
rank_info: list[IORank] = []
|
|
||||||
for rank, percentile in RANK_PERCENTILE.items():
|
|
||||||
offset = floor((percentile / 100) * len(users)) - 1
|
|
||||||
tr_line = users[offset].league.rating
|
|
||||||
rank_users = rank_to_users[rank]
|
|
||||||
rank_info.append(
|
|
||||||
IORank(
|
|
||||||
rank=rank,
|
|
||||||
tr_line=tr_line,
|
|
||||||
player_count=len(rank_users),
|
|
||||||
low_pps=(build_extremes_data(rank_users, pps, _min)),
|
|
||||||
low_apm=(build_extremes_data(rank_users, apm, _min)),
|
|
||||||
low_vs=(build_extremes_data(rank_users, vs, _min)),
|
|
||||||
avg_pps=mean({i.league.pps for i in rank_users}),
|
|
||||||
avg_apm=mean({i.league.apm for i in rank_users}),
|
|
||||||
avg_vs=mean({i.league.vs for i in rank_users}),
|
|
||||||
high_pps=(build_extremes_data(rank_users, pps, _max)),
|
|
||||||
high_apm=(build_extremes_data(rank_users, apm, _max)),
|
|
||||||
high_vs=(build_extremes_data(rank_users, vs, _max)),
|
|
||||||
update_time=league.cache.cached_at,
|
|
||||||
file_hash=data_hash,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
async with get_session() as session:
|
|
||||||
session.add_all(rank_info)
|
|
||||||
await session.commit()
|
|
||||||
|
|
||||||
|
|
||||||
@driver.on_startup
|
|
||||||
async def _() -> None:
|
|
||||||
async with get_session() as session:
|
|
||||||
latest_time = await session.scalar(select(IORank.update_time).order_by(IORank.id.desc()).limit(1))
|
|
||||||
if latest_time is None or datetime.now(tz=UTC) - latest_time.replace(tzinfo=UTC) > timedelta(hours=6):
|
|
||||||
await get_tetra_league_data()
|
|
||||||
113
nonebot_plugin_tetris_stats/games/tetrio/rank/__init__.py
Normal file
113
nonebot_plugin_tetris_stats/games/tetrio/rank/__init__.py
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
from collections import defaultdict
|
||||||
|
from collections.abc import Callable
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from hashlib import sha512
|
||||||
|
from math import floor
|
||||||
|
from statistics import mean
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
from aiofiles import open
|
||||||
|
from nonebot import get_driver
|
||||||
|
from nonebot.compat import model_dump
|
||||||
|
from nonebot.utils import run_sync
|
||||||
|
from nonebot_plugin_apscheduler import scheduler
|
||||||
|
from nonebot_plugin_localstore import get_data_file
|
||||||
|
from nonebot_plugin_orm import get_session
|
||||||
|
from sqlalchemy import select
|
||||||
|
from zstandard import ZstdCompressor
|
||||||
|
|
||||||
|
from ....utils.exception import RequestError
|
||||||
|
from ....utils.retry import retry
|
||||||
|
from ..api.schemas.base import FailedModel
|
||||||
|
from ..api.schemas.tetra_league import ValidUser
|
||||||
|
from ..api.schemas.user import User
|
||||||
|
from ..api.tetra_league import full_export
|
||||||
|
from ..constant import RANK_PERCENTILE
|
||||||
|
from ..models import IORank
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from ..api.typing import Rank
|
||||||
|
|
||||||
|
UTC = timezone.utc
|
||||||
|
|
||||||
|
driver = get_driver()
|
||||||
|
|
||||||
|
|
||||||
|
@scheduler.scheduled_job('cron', hour='0,6,12,18', minute=0)
|
||||||
|
@retry(exception_type=RequestError, delay=timedelta(minutes=15))
|
||||||
|
async def get_tetra_league_data() -> None:
|
||||||
|
league, original = await full_export(with_original=True)
|
||||||
|
if isinstance(league, FailedModel):
|
||||||
|
msg = f'排行榜数据请求错误:\n{league.error}'
|
||||||
|
raise RequestError(msg)
|
||||||
|
|
||||||
|
def pps(user: ValidUser) -> float:
|
||||||
|
return user.league.pps
|
||||||
|
|
||||||
|
def apm(user: ValidUser) -> float:
|
||||||
|
return user.league.apm
|
||||||
|
|
||||||
|
def vs(user: ValidUser) -> float:
|
||||||
|
return user.league.vs
|
||||||
|
|
||||||
|
def _min(users: list[ValidUser], field: Callable[[ValidUser], float]) -> ValidUser:
|
||||||
|
return min(users, key=field)
|
||||||
|
|
||||||
|
def _max(users: list[ValidUser], field: Callable[[ValidUser], float]) -> ValidUser:
|
||||||
|
return max(users, key=field)
|
||||||
|
|
||||||
|
def build_extremes_data(
|
||||||
|
users: list[ValidUser],
|
||||||
|
field: Callable[[ValidUser], float],
|
||||||
|
sort: Callable[[list[ValidUser], Callable[[ValidUser], float]], ValidUser],
|
||||||
|
) -> tuple[dict[str, str], float]:
|
||||||
|
user = sort(users, field)
|
||||||
|
return model_dump(User(ID=user.id, name=user.username)), field(user)
|
||||||
|
|
||||||
|
data_hash: str | None = await run_sync((await run_sync(sha512)(original)).hexdigest)()
|
||||||
|
async with open(get_data_file('nonebot_plugin_tetris_stats', f'{data_hash}.json.zst'), mode='wb') as file:
|
||||||
|
await file.write(await run_sync(ZstdCompressor(level=12, threads=-1).compress)(original))
|
||||||
|
|
||||||
|
users = [i for i in league.data.users if isinstance(i, ValidUser)]
|
||||||
|
rank_to_users: defaultdict[Rank, list[ValidUser]] = defaultdict(list)
|
||||||
|
for i in users:
|
||||||
|
rank_to_users[i.league.rank].append(i)
|
||||||
|
rank_info: list[IORank] = []
|
||||||
|
for rank, percentile in RANK_PERCENTILE.items():
|
||||||
|
offset = floor((percentile / 100) * len(users)) - 1
|
||||||
|
tr_line = users[offset].league.rating
|
||||||
|
rank_users = rank_to_users[rank]
|
||||||
|
rank_info.append(
|
||||||
|
IORank(
|
||||||
|
rank=rank,
|
||||||
|
tr_line=tr_line,
|
||||||
|
player_count=len(rank_users),
|
||||||
|
low_pps=(build_extremes_data(rank_users, pps, _min)),
|
||||||
|
low_apm=(build_extremes_data(rank_users, apm, _min)),
|
||||||
|
low_vs=(build_extremes_data(rank_users, vs, _min)),
|
||||||
|
avg_pps=mean({i.league.pps for i in rank_users}),
|
||||||
|
avg_apm=mean({i.league.apm for i in rank_users}),
|
||||||
|
avg_vs=mean({i.league.vs for i in rank_users}),
|
||||||
|
high_pps=(build_extremes_data(rank_users, pps, _max)),
|
||||||
|
high_apm=(build_extremes_data(rank_users, apm, _max)),
|
||||||
|
high_vs=(build_extremes_data(rank_users, vs, _max)),
|
||||||
|
update_time=league.cache.cached_at,
|
||||||
|
file_hash=data_hash,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
async with get_session() as session:
|
||||||
|
session.add_all(rank_info)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
|
|
||||||
|
@driver.on_startup
|
||||||
|
async def _() -> None:
|
||||||
|
async with get_session() as session:
|
||||||
|
latest_time = await session.scalar(select(IORank.update_time).order_by(IORank.id.desc()).limit(1))
|
||||||
|
if latest_time is None or datetime.now(tz=UTC) - latest_time.replace(tzinfo=UTC) > timedelta(hours=6):
|
||||||
|
await get_tetra_league_data()
|
||||||
|
|
||||||
|
|
||||||
|
from . import all, detail # noqa: E402
|
||||||
|
|
||||||
|
__all__ = ['all', 'detail']
|
||||||
78
nonebot_plugin_tetris_stats/games/tetrio/rank/all.py
Normal file
78
nonebot_plugin_tetris_stats/games/tetrio/rank/all.py
Normal file
@@ -0,0 +1,78 @@
|
|||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
from nonebot_plugin_orm import get_session
|
||||||
|
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||||
|
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
|
||||||
|
from ....db import trigger
|
||||||
|
from ....utils.host import HostPage, get_self_netloc
|
||||||
|
from ....utils.metrics import get_metrics
|
||||||
|
from ....utils.render import render
|
||||||
|
from ....utils.render.schemas.tetrio.tetrio_rank import AverageData, Data, ItemData
|
||||||
|
from ....utils.screenshot import screenshot
|
||||||
|
from .. import alc
|
||||||
|
from ..constant import GAME_TYPE
|
||||||
|
from ..models import IORank
|
||||||
|
|
||||||
|
|
||||||
|
@alc.assign('TETRIO.rank.all')
|
||||||
|
async def _(event_session: EventSession):
|
||||||
|
async with trigger(
|
||||||
|
session_persist_id=await get_session_persist_id(event_session),
|
||||||
|
game_platform=GAME_TYPE,
|
||||||
|
command_type='rank',
|
||||||
|
command_args=['--all'],
|
||||||
|
):
|
||||||
|
async with get_session() as session:
|
||||||
|
latest_update_time = (
|
||||||
|
await session.scalars(select(IORank.update_time).order_by(IORank.id.desc()).limit(1))
|
||||||
|
).one()
|
||||||
|
compare_time = (
|
||||||
|
await session.scalars(
|
||||||
|
select(IORank.update_time)
|
||||||
|
.order_by(
|
||||||
|
func.abs(
|
||||||
|
func.julianday(IORank.update_time)
|
||||||
|
- func.julianday(latest_update_time - timedelta(hours=24))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.limit(1)
|
||||||
|
)
|
||||||
|
).one()
|
||||||
|
latest_data = (
|
||||||
|
await session.scalars(
|
||||||
|
select(IORank).where(IORank.update_time == latest_update_time).order_by(IORank.tr_line.desc())
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
compare_data = (
|
||||||
|
await session.scalars(
|
||||||
|
select(IORank).where(IORank.update_time == compare_time).order_by(IORank.tr_line.desc())
|
||||||
|
)
|
||||||
|
).all()
|
||||||
|
async with HostPage(
|
||||||
|
await render(
|
||||||
|
'v2/tetrio/rank',
|
||||||
|
Data(
|
||||||
|
items={
|
||||||
|
i[0].rank: ItemData(
|
||||||
|
require_tr=round(i[0].tr_line, 2),
|
||||||
|
trending=round(i[0].tr_line - i[1].tr_line, 2),
|
||||||
|
average_data=AverageData(
|
||||||
|
pps=(metrics := get_metrics(pps=i[0].avg_pps, apm=i[0].avg_apm, vs=i[0].avg_vs)).pps,
|
||||||
|
apm=metrics.apm,
|
||||||
|
apl=metrics.apl,
|
||||||
|
vs=metrics.vs,
|
||||||
|
adpl=metrics.adpl,
|
||||||
|
),
|
||||||
|
players=i[0].player_count,
|
||||||
|
)
|
||||||
|
for i in zip(latest_data, compare_data, strict=True)
|
||||||
|
},
|
||||||
|
updated_at=latest_update_time,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
) as page_hash:
|
||||||
|
await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||||
|
await UniMessage.image(raw=await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')).finish()
|
||||||
138
nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py
Normal file
138
nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
from nonebot import get_driver
|
||||||
|
from nonebot_plugin_alconna import UniMessage
|
||||||
|
from nonebot_plugin_orm import get_session
|
||||||
|
from nonebot_plugin_session import EventSession # type: ignore[import-untyped]
|
||||||
|
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||||
|
from sqlalchemy import func, select
|
||||||
|
|
||||||
|
from ....db import trigger
|
||||||
|
from ....utils.host import HostPage, get_self_netloc
|
||||||
|
from ....utils.metrics import get_metrics
|
||||||
|
from ....utils.render import render
|
||||||
|
from ....utils.render.schemas.tetrio.tetrio_rank_detail import Data, SpecialData
|
||||||
|
from ....utils.screenshot import screenshot
|
||||||
|
from .. import alc
|
||||||
|
from ..api.typing import ValidRank
|
||||||
|
from ..constant import GAME_TYPE
|
||||||
|
from ..models import IORank
|
||||||
|
|
||||||
|
UTC = timezone.utc
|
||||||
|
|
||||||
|
driver = get_driver()
|
||||||
|
|
||||||
|
|
||||||
|
@alc.assign('TETRIO.rank')
|
||||||
|
async def _(rank: ValidRank, event_session: EventSession):
|
||||||
|
async with trigger(
|
||||||
|
session_persist_id=await get_session_persist_id(event_session),
|
||||||
|
game_platform=GAME_TYPE,
|
||||||
|
command_type='rank',
|
||||||
|
command_args=[f'--detail {rank}'],
|
||||||
|
):
|
||||||
|
async with get_session() as session:
|
||||||
|
latest_data = (
|
||||||
|
await session.scalars(select(IORank).where(IORank.rank == rank).order_by(IORank.id.desc()).limit(1))
|
||||||
|
).one()
|
||||||
|
compare_data = (
|
||||||
|
await session.scalars(
|
||||||
|
select(IORank)
|
||||||
|
.where(IORank.rank == rank)
|
||||||
|
.order_by(
|
||||||
|
func.abs(
|
||||||
|
func.julianday(IORank.update_time)
|
||||||
|
- func.julianday(latest_data.update_time - timedelta(hours=24))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
.limit(1)
|
||||||
|
)
|
||||||
|
).one()
|
||||||
|
await UniMessage.image(raw=await make_image(latest_data, compare_data)).finish()
|
||||||
|
|
||||||
|
|
||||||
|
async def make_image(latest_data: IORank, compare_data: IORank) -> bytes:
|
||||||
|
avg = get_metrics(pps=latest_data.avg_pps, apm=latest_data.avg_apm, vs=latest_data.avg_vs)
|
||||||
|
low_pps = get_metrics(pps=latest_data.low_pps[1])
|
||||||
|
low_vs = get_metrics(vs=latest_data.low_vs[1])
|
||||||
|
max_pps = get_metrics(pps=latest_data.high_pps[1])
|
||||||
|
max_vs = get_metrics(vs=latest_data.high_vs[1])
|
||||||
|
async with HostPage(
|
||||||
|
await render(
|
||||||
|
'v2/tetrio/rank/detail',
|
||||||
|
Data(
|
||||||
|
name=latest_data.rank,
|
||||||
|
trending=round(latest_data.tr_line - compare_data.tr_line, 2),
|
||||||
|
require_tr=round(latest_data.tr_line, 2),
|
||||||
|
players=latest_data.player_count,
|
||||||
|
minimum_data=SpecialData(
|
||||||
|
apm=latest_data.low_apm[1],
|
||||||
|
pps=low_pps.pps,
|
||||||
|
lpm=low_pps.lpm,
|
||||||
|
vs=low_vs.vs,
|
||||||
|
adpm=low_vs.adpm,
|
||||||
|
apm_holder=latest_data.low_apm[0]['name'].upper(),
|
||||||
|
pps_holder=latest_data.low_pps[0]['name'].upper(),
|
||||||
|
vs_holder=latest_data.low_vs[0]['name'].upper(),
|
||||||
|
),
|
||||||
|
average_data=SpecialData(
|
||||||
|
apm=avg.apm,
|
||||||
|
pps=avg.pps,
|
||||||
|
lpm=avg.lpm,
|
||||||
|
vs=avg.vs,
|
||||||
|
adpm=avg.adpm,
|
||||||
|
apl=avg.apl,
|
||||||
|
adpl=avg.adpl,
|
||||||
|
),
|
||||||
|
maximum_data=SpecialData(
|
||||||
|
apm=latest_data.high_apm[1],
|
||||||
|
pps=max_pps.pps,
|
||||||
|
lpm=max_pps.lpm,
|
||||||
|
vs=max_vs.vs,
|
||||||
|
adpm=max_vs.adpm,
|
||||||
|
apm_holder=latest_data.high_apm[0]['name'].upper(),
|
||||||
|
pps_holder=latest_data.high_pps[0]['name'].upper(),
|
||||||
|
vs_holder=latest_data.high_vs[0]['name'].upper(),
|
||||||
|
),
|
||||||
|
updated_at=latest_data.update_time.replace(tzinfo=UTC).astimezone(ZoneInfo('Asia/Shanghai')),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
) as page_hash:
|
||||||
|
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||||
|
|
||||||
|
|
||||||
|
async def make_text(latest_data: IORank, compare_data: IORank) -> str:
|
||||||
|
message = ''
|
||||||
|
if (datetime.now(UTC) - latest_data.update_time.replace(tzinfo=UTC)) > timedelta(hours=7):
|
||||||
|
message += 'Warning: 数据超过7小时未更新, 请联系Bot主人查看后台\n'
|
||||||
|
message += f'{latest_data.rank.upper()} 段 分数线 {latest_data.tr_line:.2f} TR, {latest_data.player_count} 名玩家\n'
|
||||||
|
if compare_data.id != latest_data.id:
|
||||||
|
message += f'对比 {(latest_data.update_time-compare_data.update_time).total_seconds()/3600:.2f} 小时前趋势: {f"↑{difference:.2f}" if (difference:=latest_data.tr_line-compare_data.tr_line) > 0 else f"↓{-difference:.2f}" if difference < 0 else "→"}'
|
||||||
|
else:
|
||||||
|
message += '暂无对比数据'
|
||||||
|
avg = get_metrics(pps=latest_data.avg_pps, apm=latest_data.avg_apm, vs=latest_data.avg_vs)
|
||||||
|
low_pps = get_metrics(pps=latest_data.low_pps[1])
|
||||||
|
low_vs = get_metrics(vs=latest_data.low_vs[1])
|
||||||
|
max_pps = get_metrics(pps=latest_data.high_pps[1])
|
||||||
|
max_vs = get_metrics(vs=latest_data.high_vs[1])
|
||||||
|
message += (
|
||||||
|
'\n'
|
||||||
|
'平均数据:\n'
|
||||||
|
f"L'PM: {avg.lpm} ( {avg.pps} pps )\n"
|
||||||
|
f'APM: {avg.apm} ( x{avg.apl} )\n'
|
||||||
|
f'ADPM: {avg.adpm} ( x{avg.adpl} ) ( {avg.vs}vs )\n'
|
||||||
|
'\n'
|
||||||
|
'最低数据:\n'
|
||||||
|
f"L'PM: {low_pps.lpm} ( {low_pps.pps} pps ) By: {latest_data.low_pps[0]['name'].upper()}\n"
|
||||||
|
f'APM: {latest_data.low_apm[1]} By: {latest_data.low_apm[0]["name"].upper()}\n'
|
||||||
|
f'ADPM: {low_vs.adpm} ( {low_vs.vs}vs ) By: {latest_data.low_vs[0]["name"].upper()}\n'
|
||||||
|
'\n'
|
||||||
|
'最高数据:\n'
|
||||||
|
f"L'PM: {max_pps.lpm} ( {max_pps.pps} pps ) By: {latest_data.high_pps[0]['name'].upper()}\n"
|
||||||
|
f'APM: {latest_data.high_apm[1]} By: {latest_data.high_apm[0]["name"].upper()}\n'
|
||||||
|
f'ADPM: {max_vs.adpm} ( {max_vs.vs}vs ) By: {latest_data.high_vs[0]["name"].upper()}\n'
|
||||||
|
'\n'
|
||||||
|
f'数据更新时间: {latest_data.update_time.replace(tzinfo=UTC).astimezone(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")}'
|
||||||
|
)
|
||||||
|
return message
|
||||||
@@ -6,6 +6,8 @@ from nonebot.compat import PYDANTIC_V2
|
|||||||
from ..templates import templates_dir
|
from ..templates import templates_dir
|
||||||
from .schemas.bind import Bind
|
from .schemas.bind import Bind
|
||||||
from .schemas.tetrio.tetrio_info import Info as TETRIOInfo
|
from .schemas.tetrio.tetrio_info import Info as TETRIOInfo
|
||||||
|
from .schemas.tetrio.tetrio_rank import Data as TETRIORankData
|
||||||
|
from .schemas.tetrio.tetrio_rank_detail import Data as TETRIORankDetailData
|
||||||
from .schemas.tetrio.tetrio_record_blitz import Record as TETRIORecordBlitz
|
from .schemas.tetrio.tetrio_record_blitz import Record as TETRIORecordBlitz
|
||||||
from .schemas.tetrio.tetrio_record_sprint import Record as TETRIORecordSprint
|
from .schemas.tetrio.tetrio_record_sprint import Record as TETRIORecordSprint
|
||||||
from .schemas.tetrio.tetrio_user_info_v2 import Info as TETRIOUserInfoV2
|
from .schemas.tetrio.tetrio_user_info_v2 import Info as TETRIOUserInfoV2
|
||||||
@@ -20,34 +22,24 @@ env = Environment(
|
|||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v1/binding'], data: Bind) -> str: ...
|
async def render(render_type: Literal['v1/binding'], data: Bind) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v1/tetrio/info'], data: TETRIOInfo) -> str: ...
|
async def render(render_type: Literal['v1/tetrio/info'], data: TETRIOInfo) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v1/top/info'], data: TOPInfo) -> str: ...
|
async def render(render_type: Literal['v1/top/info'], data: TOPInfo) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v1/tos/info'], data: TOSInfo) -> str: ...
|
async def render(render_type: Literal['v1/tos/info'], data: TOSInfo) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v2/tetrio/user/info'], data: TETRIOUserInfoV2) -> str: ...
|
async def render(render_type: Literal['v2/tetrio/user/info'], data: TETRIOUserInfoV2) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v2/tetrio/user/list'], data: TETRIOUserListV2) -> str: ...
|
async def render(render_type: Literal['v2/tetrio/user/list'], data: TETRIOUserListV2) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v2/tetrio/record/40l'], data: TETRIORecordSprint) -> str: ...
|
async def render(render_type: Literal['v2/tetrio/record/40l'], data: TETRIORecordSprint) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
@overload
|
@overload
|
||||||
async def render(render_type: Literal['v2/tetrio/record/blitz'], data: TETRIORecordBlitz) -> str: ...
|
async def render(render_type: Literal['v2/tetrio/record/blitz'], data: TETRIORecordBlitz) -> str: ...
|
||||||
|
@overload
|
||||||
|
async def render(render_type: Literal['v2/tetrio/rank'], data: TETRIORankData) -> str: ...
|
||||||
|
@overload
|
||||||
|
async def render(render_type: Literal['v2/tetrio/rank/detail'], data: TETRIORankDetailData) -> str: ...
|
||||||
|
|
||||||
|
|
||||||
async def render(
|
async def render(
|
||||||
@@ -60,6 +52,8 @@ async def render(
|
|||||||
'v2/tetrio/user/list',
|
'v2/tetrio/user/list',
|
||||||
'v2/tetrio/record/40l',
|
'v2/tetrio/record/40l',
|
||||||
'v2/tetrio/record/blitz',
|
'v2/tetrio/record/blitz',
|
||||||
|
'v2/tetrio/rank',
|
||||||
|
'v2/tetrio/rank/detail',
|
||||||
],
|
],
|
||||||
data: Bind
|
data: Bind
|
||||||
| TETRIOInfo
|
| TETRIOInfo
|
||||||
@@ -68,7 +62,9 @@ async def render(
|
|||||||
| TETRIOUserInfoV2
|
| TETRIOUserInfoV2
|
||||||
| TETRIOUserListV2
|
| TETRIOUserListV2
|
||||||
| TETRIORecordSprint
|
| TETRIORecordSprint
|
||||||
| TETRIORecordBlitz,
|
| TETRIORecordBlitz
|
||||||
|
| TETRIORankData
|
||||||
|
| TETRIORankDetailData,
|
||||||
) -> str:
|
) -> str:
|
||||||
if PYDANTIC_V2:
|
if PYDANTIC_V2:
|
||||||
return await env.get_template('index.html').render_async(
|
return await env.get_template('index.html').render_async(
|
||||||
|
|||||||
@@ -0,0 +1,25 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from .....games.tetrio.api.typing import ValidRank
|
||||||
|
|
||||||
|
|
||||||
|
class AverageData(BaseModel):
|
||||||
|
pps: float
|
||||||
|
apm: float
|
||||||
|
apl: float
|
||||||
|
vs: float
|
||||||
|
adpl: float
|
||||||
|
|
||||||
|
|
||||||
|
class ItemData(BaseModel):
|
||||||
|
require_tr: float
|
||||||
|
trending: float
|
||||||
|
average_data: AverageData
|
||||||
|
players: int
|
||||||
|
|
||||||
|
|
||||||
|
class Data(BaseModel):
|
||||||
|
items: dict[ValidRank, ItemData]
|
||||||
|
updated_at: datetime
|
||||||
@@ -0,0 +1,31 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from .....games.tetrio.api.typing import ValidRank
|
||||||
|
|
||||||
|
|
||||||
|
class SpecialData(BaseModel):
|
||||||
|
apm: float
|
||||||
|
pps: float
|
||||||
|
lpm: float
|
||||||
|
vs: float
|
||||||
|
adpm: float
|
||||||
|
apl: float | None = None
|
||||||
|
adpl: float | None = None
|
||||||
|
apm_holder: str | None = None
|
||||||
|
pps_holder: str | None = None
|
||||||
|
vs_holder: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class Data(BaseModel):
|
||||||
|
name: ValidRank
|
||||||
|
trending: float
|
||||||
|
require_tr: float
|
||||||
|
players: int
|
||||||
|
|
||||||
|
minimum_data: SpecialData
|
||||||
|
average_data: SpecialData
|
||||||
|
maximum_data: SpecialData
|
||||||
|
|
||||||
|
updated_at: datetime
|
||||||
Reference in New Issue
Block a user