适配新赛季 rank

This commit is contained in:
2024-08-24 21:06:45 +08:00
parent 97e2abed78
commit 6293d088db
21 changed files with 778 additions and 193 deletions

View File

@@ -0,0 +1,119 @@
"""add TETRIOLeagueStats
迁移 ID: 5a1b93948494
父迁移: cfeab6961dce
创建时间: 2024-08-24 00:22:41.359500
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = '5a1b93948494'
down_revision: str | Sequence[str] | None = 'cfeab6961dce'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade(name: str = '') -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
'nonebot_plugin_tetris_stats_tetrioleaguestats',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('update_time', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetrioleaguestats')),
info={'bind_key': 'nonebot_plugin_tetris_stats'},
)
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestats', schema=None) as batch_op:
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestats_update_time'), ['update_time'], unique=False
)
op.create_table(
'nonebot_plugin_tetris_stats_tetrioleaguehistorical',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('request_id', sa.Uuid(), nullable=False),
sa.Column('data', sa.JSON(), nullable=False),
sa.Column('update_time', sa.DateTime(), nullable=False),
sa.Column('stats_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
['stats_id'],
['nonebot_plugin_tetris_stats_tetrioleaguestats.id'],
name=op.f(
'fk_nonebot_plugin_tetris_stats_tetrioleaguehistorical_stats_id_nonebot_plugin_tetris_stats_tetrioleaguestats'
),
),
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetrioleaguehistorical')),
info={'bind_key': 'nonebot_plugin_tetris_stats'},
)
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguehistorical', schema=None) as batch_op:
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_request_id'), ['request_id'], unique=False
)
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_update_time'),
['update_time'],
unique=False,
)
op.create_table(
'nonebot_plugin_tetris_stats_tetrioleaguestatsfield',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('rank', sa.String(length=2), nullable=False),
sa.Column('tr_line', sa.Float(), nullable=False),
sa.Column('player_count', sa.Integer(), nullable=False),
sa.Column('low_pps', sa.JSON(), nullable=False),
sa.Column('low_apm', sa.JSON(), nullable=False),
sa.Column('low_vs', sa.JSON(), nullable=False),
sa.Column('avg_pps', sa.Float(), nullable=False),
sa.Column('avg_apm', sa.Float(), nullable=False),
sa.Column('avg_vs', sa.Float(), nullable=False),
sa.Column('high_pps', sa.JSON(), nullable=False),
sa.Column('high_apm', sa.JSON(), nullable=False),
sa.Column('high_vs', sa.JSON(), nullable=False),
sa.Column('stats_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(
['stats_id'],
['nonebot_plugin_tetris_stats_tetrioleaguestats.id'],
name=op.f(
'fk_nonebot_plugin_tetris_stats_tetrioleaguestatsfield_stats_id_nonebot_plugin_tetris_stats_tetrioleaguestats'
),
),
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetrioleaguestatsfield')),
info={'bind_key': 'nonebot_plugin_tetris_stats'},
)
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestatsfield', schema=None) as batch_op:
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestatsfield_rank'), ['rank'], unique=False
)
# ### end Alembic commands ###
def downgrade(name: str = '') -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestatsfield', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestatsfield_rank'))
op.drop_table('nonebot_plugin_tetris_stats_tetrioleaguestatsfield')
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguehistorical', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_update_time'))
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_request_id'))
op.drop_table('nonebot_plugin_tetris_stats_tetrioleaguehistorical')
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestats', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestats_update_time'))
op.drop_table('nonebot_plugin_tetris_stats_tetrioleaguestats')
# ### end Alembic commands ###

View File

@@ -23,7 +23,7 @@ command = Subcommand(
)
from . import bind, config, query, record # noqa: E402
from . import bind, config, query, rank, record # noqa: E402
main_command.add(command)
@@ -32,5 +32,6 @@ __all__ = [
'bind',
'config',
'query',
'rank',
'record',
]

View File

@@ -1,6 +1,5 @@
from .player import Player
from .schemas.user import User
from .schemas.user_info import UserInfoSuccess
from .tetra_league import full_export as tetra_league_full_export
__all__ = ['Player', 'User', 'UserInfoSuccess', 'tetra_league_full_export']
__all__ = ['Player', 'User', 'UserInfoSuccess']

View File

@@ -22,13 +22,13 @@ class Cache:
task: ClassVar[WeakValueDictionary[URL, Lock]] = WeakValueDictionary()
@classmethod
async def get(cls, url: URL) -> bytes:
async def get(cls, url: URL, extra_headers: dict | None = None) -> bytes:
lock = cls.task.setdefault(url, Lock())
async with lock:
if (cached_data := await cls.cache.get(url)) is not None:
logger.debug(f'{url}: Cache hit!')
return cached_data
response_data = await request.request(url)
response_data = await request.request(url, extra_headers, enable_anti_cloudflare=True)
parsed_data: SuccessModel | FailedModel = type_validate_json(SuccessModel | FailedModel, response_data) # type: ignore[arg-type]
if isinstance(parsed_data, SuccessModel):
await cls.cache.add(

View File

@@ -0,0 +1,90 @@
from typing import Literal, overload
from uuid import UUID
from msgspec import to_builtins
from nonebot.compat import type_validate_json
from yarl import URL
from ....utils.exception import RequestError
from ..constant import BASE_URL
from .cache import Cache
from .schemas.base import FailedModel
from .schemas.leaderboards import Parameter
from .schemas.leaderboards.by import By, BySuccessModel
from .schemas.leaderboards.solo import Solo, SoloSuccessModel
from .schemas.leaderboards.zenith import Zenith, ZenithSuccessModel
async def by(
by_type: Literal['league', 'xp', 'ar'], parameter: Parameter, x_session_id: UUID | None = None
) -> BySuccessModel:
model: By = type_validate_json(
By, # type: ignore[arg-type]
await get(
BASE_URL / f'users/by/{by_type}',
parameter,
{'X-Session-ID': str(x_session_id)} if x_session_id is not None else None,
),
)
if isinstance(model, FailedModel):
msg = f'排行榜信息请求错误:\n{model.error}'
raise RequestError(msg)
return model
@overload
async def records(
records_type: Literal['40l', 'blitz'],
scope: str = '_global',
revolution_id: str | None = None,
*,
parameter: Parameter,
) -> SoloSuccessModel: ...
@overload
async def records(
records_type: Literal['zenith', 'zenithex'],
scope: str = '_global',
revolution_id: str | None = None,
*,
parameter: Parameter,
) -> ZenithSuccessModel: ...
async def records(
records_type: Literal['40l', 'blitz', 'zenith', 'zenithex'],
scope: str = '_global',
revolution_id: str | None = None,
*,
parameter: Parameter,
) -> SoloSuccessModel | ZenithSuccessModel:
model: Solo | Zenith
match records_type:
case '40l' | 'blitz':
model = type_validate_json(
Solo, # type: ignore[arg-type]
await get(
BASE_URL / 'records' / f'{records_type}{scope}{revolution_id if revolution_id is not None else ""}',
parameter,
),
)
case 'zenith' | 'zenithex':
model = type_validate_json(
Zenith, # type: ignore[arg-type]
await get(
BASE_URL / 'records' / f'{records_type}{scope}{revolution_id if revolution_id is not None else ""}',
parameter,
),
)
case _:
msg = f'records_type: {records_type} is not supported'
raise ValueError(msg)
if isinstance(model, FailedModel):
msg = f'排行榜信息请求错误:\n{model.error}' # type: ignore[attr-defined]
raise RequestError(msg)
return model
async def get(url: URL, parameter: Parameter, extra_headers: dict | None = None) -> bytes:
return await Cache.get(url % to_builtins(parameter), extra_headers)

View File

@@ -0,0 +1,10 @@
from typing import Annotated
from msgspec import Meta, Struct
class Parameter(Struct, omit_defaults=True):
after: str | None = None
before: str | None = None
limit: Annotated[int, Meta(ge=1, le=100)] = 25
country: str | None = None

View File

@@ -1,27 +0,0 @@
from pydantic import BaseModel, Field
from ..base import SuccessModel
from .base import Entry as BaseEntry
class ArCounts(BaseModel):
bronze: int | None = Field(None, alias='1')
silver: int | None = Field(None, alias='2')
gold: int | None = Field(None, alias='3')
platinum: int | None = Field(None, alias='4')
diamond: int | None = Field(None, alias='5')
issued: int | None = Field(None, alias='100')
top10: int | None = Field(None, alias='t10')
class Entry(BaseEntry):
ar: int
ar_counts: ArCounts
class Data(BaseModel):
entries: list[Entry]
class ArSuccessModel(SuccessModel):
data: Data

View File

@@ -1,30 +0,0 @@
from datetime import datetime
from pydantic import BaseModel, Field
from ...typing import Rank
from ..base import P
class League(BaseModel):
gamesplayed: int
gameswon: int
rating: int
rank: Rank
decaying: bool
class Entry(BaseModel):
id: str = Field(..., alias='_id')
username: str
role: str
xp: float
league: League
supporter: bool | None = None
verified: bool
country: str | None = None
ts: datetime
gamesplayed: int
gameswon: int
gametime: float
p: P

View File

@@ -0,0 +1,65 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel, Field
from ...typing import Rank, ValidRank
from ..base import FailedModel, P, SuccessModel
class ArCounts(BaseModel):
bronze: int | None = Field(default=None, alias='1')
silver: int | None = Field(default=None, alias='2')
gold: int | None = Field(default=None, alias='3')
platinum: int | None = Field(default=None, alias='4')
diamond: int | None = Field(default=None, alias='5')
issued: int | None = Field(default=None, alias='100')
top3: int | None = Field(default=None, alias='t3')
top5: int | None = Field(default=None, alias='t5')
top10: int | None = Field(default=None, alias='t10')
top25: int | None = Field(default=None, alias='t25')
top50: int | None = Field(default=None, alias='t50')
top100: int | None = Field(default=None, alias='t100')
class League(BaseModel):
gamesplayed: int
gameswon: int
tr: float
gxe: float
rank: Rank
bestrank: ValidRank
glicko: float
rd: float
apm: float
pps: float
vs: float
decaying: bool
class Entry(BaseModel):
id: str = Field(..., alias='_id')
username: str
role: Literal['anon', 'user', 'bot', 'halfmod', 'mod', 'admin', 'sysop']
ts: datetime | None = None
xp: float
country: str | None = None
supporter: bool | None = None
league: League
gamesplayed: int
gameswon: int
gametime: float
ar: int
ar_counts: ArCounts
p: P
class Data(BaseModel):
entries: list[Entry]
class BySuccessModel(SuccessModel):
data: Data
By = BySuccessModel | FailedModel

View File

@@ -1,6 +1,6 @@
from pydantic import BaseModel
from ..base import SuccessModel
from ..base import FailedModel, SuccessModel
from ..summaries.solo import Record
@@ -10,3 +10,6 @@ class Data(BaseModel):
class SoloSuccessModel(SuccessModel):
data: Data
Solo = SoloSuccessModel | FailedModel

View File

@@ -1,12 +0,0 @@
from pydantic import BaseModel
from ..base import SuccessModel
from .base import Entry
class Data(BaseModel):
entries: list[Entry]
class XpSuccessModel(SuccessModel):
data: Data

View File

@@ -1,6 +1,6 @@
from pydantic import BaseModel
from ..base import SuccessModel
from ..base import FailedModel, SuccessModel
from ..summaries.zenith import Record
@@ -10,3 +10,6 @@ class Data(BaseModel):
class ZenithSuccessModel(SuccessModel):
data: Data
Zenith = ZenithSuccessModel | FailedModel

View File

@@ -1,59 +0,0 @@
from pydantic import BaseModel, Field
from ..typing import Rank
from .base import FailedModel
from .base import SuccessModel as BaseSuccessModel
class _User(BaseModel):
id: str = Field(..., alias='_id')
username: str
role: str
xp: float
supporter: bool | None = None
verified: bool
country: str | None = None
class _League(BaseModel):
gamesplayed: int
gameswon: int
rating: float
rank: Rank
bestrank: Rank
decaying: bool
class ValidLeague(_League):
glicko: float
rd: float
apm: float
pps: float
vs: float
class ValidUser(_User):
league: ValidLeague
class InvalidLeague(_League):
glicko: float | None = None
rd: float | None = None
apm: float | None = None
pps: float | None = None
vs: float | None = None
class InvalidUser(_User):
league: InvalidLeague
class Data(BaseModel):
users: list[ValidUser | InvalidUser]
class TetraLeagueSuccess(BaseSuccessModel):
data: Data
TetraLeague = TetraLeagueSuccess | FailedModel

View File

@@ -1,55 +0,0 @@
from typing import Literal, NamedTuple, overload
from msgspec import Struct, to_builtins
from nonebot.compat import type_validate_json
from ....utils.exception import RequestError
from ..constant import BASE_URL
from .cache import Cache
from .schemas.base import FailedModel
from .schemas.tetra_league import TetraLeague, TetraLeagueSuccess
class Parameter(Struct, omit_defaults=True):
after: float | None = None
before: float | None = None
limit: int | None = None
country: str | None = None
async def leaderboard(parameter: Parameter | None = None) -> TetraLeagueSuccess:
league: TetraLeague = type_validate_json(
TetraLeague, # type: ignore[arg-type]
(await Cache.get(BASE_URL / 'users/lists/league' % to_builtins(parameter))),
)
if isinstance(league, FailedModel):
msg = f'排行榜数据请求错误:\n{league.error}'
raise RequestError(msg)
return league
class FullExport(NamedTuple):
model: TetraLeagueSuccess
original: bytes
@overload
async def full_export(*, with_original: Literal[False]) -> TetraLeagueSuccess: ...
@overload
async def full_export(*, with_original: Literal[True]) -> FullExport: ...
async def full_export(*, with_original: bool) -> TetraLeagueSuccess | FullExport:
full: TetraLeague = type_validate_json(
TetraLeague, # type: ignore[arg-type]
(data := await Cache.get(BASE_URL / 'users/lists/league/all')),
)
if isinstance(full, FailedModel):
msg = f'排行榜数据请求错误:\n{full.error}'
raise RequestError(msg)
if with_original:
return FullExport(full, data)
return full

View File

@@ -10,6 +10,7 @@ GAME_TYPE: Literal['IO'] = 'IO'
BASE_URL = URL('https://ch.tetr.io/api/')
RANK_PERCENTILE: dict[ValidRank, float] = {
'x+': 0.2,
'x': 1,
'u': 5,
'ss': 11,

View File

@@ -1,10 +1,53 @@
from nonebot_plugin_orm import Model
from sqlalchemy import String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from datetime import datetime
from uuid import UUID
from nonebot_plugin_orm import Model
from sqlalchemy import DateTime, ForeignKey, String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column, relationship
from ...db.models import PydanticType
from .api.schemas.leaderboards.by import BySuccessModel, Entry
from .api.typing import ValidRank
from .typing import Template
class TETRIOUserConfig(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(primary_key=True)
query_template: Mapped[Template] = mapped_column(String(2))
class TETRIOLeagueStats(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
raw: Mapped[list['TETRIOLeagueHistorical']] = relationship(back_populates='stats', lazy='noload')
fields: Mapped[list['TETRIOLeagueStatsField']] = relationship(back_populates='stats')
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)
class TETRIOLeagueHistorical(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
request_id: Mapped[UUID] = mapped_column(index=True)
data: Mapped[BySuccessModel] = mapped_column(PydanticType([], {BySuccessModel}))
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)
stats_id: Mapped[int] = mapped_column(ForeignKey('nonebot_plugin_tetris_stats_tetrioleaguestats.id'), init=False)
stats: Mapped['TETRIOLeagueStats'] = relationship(back_populates='raw')
entry_type = PydanticType([], {Entry})
class TETRIOLeagueStatsField(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
rank: Mapped[ValidRank] = mapped_column(String(2), index=True)
tr_line: Mapped[float]
player_count: Mapped[int]
low_pps: Mapped[Entry] = mapped_column(entry_type)
low_apm: Mapped[Entry] = mapped_column(entry_type)
low_vs: Mapped[Entry] = mapped_column(entry_type)
avg_pps: Mapped[float]
avg_apm: Mapped[float]
avg_vs: Mapped[float]
high_pps: Mapped[Entry] = mapped_column(entry_type)
high_apm: Mapped[Entry] = mapped_column(entry_type)
high_vs: Mapped[Entry] = mapped_column(entry_type)
stats_id: Mapped[int] = mapped_column(ForeignKey('nonebot_plugin_tetris_stats_tetrioleaguestats.id'), init=False)
stats: Mapped['TETRIOLeagueStats'] = relationship(back_populates='fields')

View File

@@ -0,0 +1,156 @@
from collections import defaultdict
from collections.abc import Callable, Sequence
from datetime import datetime, timedelta, timezone
from math import floor
from statistics import mean
from typing import TYPE_CHECKING
from uuid import uuid4
from nonebot import get_driver
from nonebot_plugin_alconna import Subcommand
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_orm import get_session
from sqlalchemy import select
from ....utils.exception import RequestError
from ....utils.limit import limit
from ....utils.retry import retry
from .. import alc
from .. import command as base_command
from ..api.leaderboards import by
from ..api.schemas.base import P
from ..api.schemas.leaderboards import Parameter
from ..api.schemas.leaderboards.by import Entry
from ..constant import RANK_PERCENTILE
from ..models import TETRIOLeagueHistorical, TETRIOLeagueStats, TETRIOLeagueStatsField
if TYPE_CHECKING:
from ..api.schemas.leaderboards.by import BySuccessModel
from ..api.typing import Rank
UTC = timezone.utc
driver = get_driver()
command = Subcommand('rank', help_text='查询 TETR.IO 段位信息')
def wrapper(slot: int | str, content: str | None) -> str | None:
if slot == 'rank' and not content:
return '--all'
if content is not None:
return f'--detail {content.lower()}'
return content
alc.shortcut(
r'(?i:io)(?i:段位|段|rank)\s*(?P<rank>[a-zA-Z+-]{0,2})',
command='tstats TETR.IO rank {rank}',
humanized='iorank',
fuzzy=False,
wrapper=wrapper,
)
def _pps(user: Entry) -> float:
return user.league.pps
def _apm(user: Entry) -> float:
return user.league.apm
def _vs(user: Entry) -> float:
return user.league.vs
def _min(users: Sequence[Entry], field: Callable[[Entry], float]) -> Entry:
return min(users, key=field)
def _max(users: Sequence[Entry], field: Callable[[Entry], float]) -> Entry:
return max(users, key=field)
def find_special_player(
users: Sequence[Entry],
field: Callable[[Entry], float],
sort: Callable[[Sequence[Entry], Callable[[Entry], float]], Entry],
) -> Entry:
return sort(users, field)
@scheduler.scheduled_job('cron', hour='0,6,12,18', minute=0)
async def get_tetra_league_data() -> None:
x_session_id = uuid4()
limit_by = retry(max_attempts=10, exception_type=RequestError)(limit(timedelta(seconds=1))(by))
prisecter = P(pri=9007199254740991, sec=9007199254740991, ter=9007199254740991) # * from ch.tetr.io
results: list[BySuccessModel] = []
while True:
model = await limit_by(
'league', Parameter(after=f'{prisecter.pri}:{prisecter.sec}:{prisecter.ter}', limit=100), x_session_id
)
prisecter = model.data.entries[-1].p
results.append(model)
if len(model.data.entries) < 100: # 分页值 # noqa: PLR2004
break
players: list[Entry] = []
for result in results:
players.extend(result.data.entries)
players.sort(key=lambda x: x.league.tr, reverse=True)
rank_player_mapping: defaultdict[Rank, list[Entry]] = defaultdict(list)
for player in players:
rank_player_mapping[player.league.rank].append(player)
stats = TETRIOLeagueStats(raw=[], fields=[], update_time=datetime.now(UTC))
fields: list[TETRIOLeagueStatsField] = []
for rank, percentile in RANK_PERCENTILE.items():
offset = floor((percentile / 100) * len(players)) - 1
tr_line = players[offset].league.tr
rank_players = rank_player_mapping[rank]
fields.append(
TETRIOLeagueStatsField(
rank=rank,
tr_line=tr_line,
player_count=len(rank_players),
low_pps=find_special_player(rank_players, _pps, _min),
low_apm=find_special_player(rank_players, _apm, _min),
low_vs=find_special_player(rank_players, _vs, _min),
avg_pps=mean(_pps(i) for i in rank_players),
avg_apm=mean(_apm(i) for i in rank_players),
avg_vs=mean(_vs(i) for i in rank_players),
high_pps=find_special_player(rank_players, _pps, _max),
high_apm=find_special_player(rank_players, _apm, _max),
high_vs=find_special_player(rank_players, _vs, _max),
stats=stats,
)
)
historicals = [
TETRIOLeagueHistorical(request_id=x_session_id, data=model, update_time=model.cache.cached_at, stats=stats)
for model in results
]
stats.raw = historicals
stats.fields = fields
async with get_session() as session:
session.add(stats)
await session.commit()
@driver.on_startup
async def _() -> None:
async with get_session() as session:
latest_time = await session.scalar(
select(TETRIOLeagueStats.update_time).order_by(TETRIOLeagueStats.id.desc()).limit(1)
)
if latest_time is None or datetime.now(tz=UTC) - latest_time.replace(tzinfo=UTC) > timedelta(hours=6):
await get_tetra_league_data()
from . import all, detail # noqa: E402
base_command.add(command)
__all__ = ['all', 'detail']

View File

@@ -0,0 +1,115 @@
from datetime import timedelta
from arclet.alconna import Arg
from nonebot_plugin_alconna import Option, Subcommand, UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from sqlalchemy import func, select
from sqlalchemy.orm import selectinload
from ....db import trigger
from ....utils.host import HostPage, get_self_netloc
from ....utils.metrics import get_metrics
from ....utils.render import render
from ....utils.render.schemas.tetrio.rank.v1 import Data as DataV1
from ....utils.render.schemas.tetrio.rank.v1 import ItemData as ItemDataV1
from ....utils.render.schemas.tetrio.rank.v2 import AverageData as AverageDataV2
from ....utils.render.schemas.tetrio.rank.v2 import Data as DataV2
from ....utils.render.schemas.tetrio.rank.v2 import ItemData as ItemDataV2
from ....utils.screenshot import screenshot
from .. import alc
from ..constant import GAME_TYPE
from ..models import TETRIOLeagueStats
from ..typing import Template
from . import command
command.add(
Subcommand(
'--all', Option('--template', Arg('template', Template), alias=['-T'], help_text='要使用的查询模板'), dest='all'
)
)
@alc.assign('TETRIO.rank.all')
async def _(event_session: EventSession, template: Template = 'v1'):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='rank',
command_args=['--all'],
):
async with get_session() as session:
latest_data = (
await session.scalars(
select(TETRIOLeagueStats)
.order_by(TETRIOLeagueStats.id.desc())
.limit(1)
.options(selectinload(TETRIOLeagueStats.fields))
)
).one()
compare_data = (
await session.scalars(
select(TETRIOLeagueStats)
.order_by(
func.abs(
func.julianday(TETRIOLeagueStats.update_time)
- func.julianday(latest_data.update_time - timedelta(hours=24))
)
)
.limit(1)
.options(selectinload(TETRIOLeagueStats.fields))
)
).one()
match template:
case 'v1':
await UniMessage.image(raw=await make_image_v1(latest_data, compare_data)).finish()
case 'v2':
await UniMessage.image(raw=await make_image_v2(latest_data, compare_data)).finish()
async def make_image_v1(latest_data: TETRIOLeagueStats, compare_data: TETRIOLeagueStats) -> bytes:
async with HostPage(
await render(
'v1/tetrio/rank',
DataV1(
items={
i[0].rank: ItemDataV1(
trending=round(i[0].tr_line - i[1].tr_line, 2),
require_tr=round(i[0].tr_line, 2),
players=i[0].player_count,
)
for i in zip(latest_data.fields, compare_data.fields, strict=True)
},
updated_at=latest_data.update_time,
),
)
) as page_hash:
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
async def make_image_v2(latest_data: TETRIOLeagueStats, compare_data: TETRIOLeagueStats) -> bytes:
async with HostPage(
await render(
'v2/tetrio/rank',
DataV2(
items={
i[0].rank: ItemDataV2(
require_tr=round(i[0].tr_line, 2),
trending=round(i[0].tr_line - i[1].tr_line, 2),
average_data=AverageDataV2(
pps=(metrics := get_metrics(pps=i[0].avg_pps, apm=i[0].avg_apm, vs=i[0].avg_vs)).pps,
apm=metrics.apm,
apl=metrics.apl,
vs=metrics.vs,
adpl=metrics.adpl,
),
players=i[0].player_count,
)
for i in zip(latest_data.fields, compare_data.fields, strict=True)
},
updated_at=latest_data.update_time,
),
)
) as page_hash:
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')

View File

@@ -0,0 +1,128 @@
from datetime import timedelta, timezone
from zoneinfo import ZoneInfo
from arclet.alconna import Arg
from nonebot import get_driver
from nonebot_plugin_alconna import Option, UniMessage
from nonebot_plugin_orm import get_session
from nonebot_plugin_session import EventSession
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
from sqlalchemy import func, select
from sqlalchemy.orm import selectinload
from ....db import trigger
from ....utils.host import HostPage, get_self_netloc
from ....utils.metrics import get_metrics
from ....utils.render import render
from ....utils.render.schemas.tetrio.rank.detail import Data, SpecialData
from ....utils.screenshot import screenshot
from .. import alc
from ..api.typing import ValidRank
from ..constant import GAME_TYPE
from ..models import TETRIOLeagueStats
from . import command
UTC = timezone.utc
driver = get_driver()
command.add(Option('--detail', Arg('rank', ValidRank), alias=['-D']))
@alc.assign('TETRIO.rank')
async def _(rank: ValidRank, event_session: EventSession):
async with trigger(
session_persist_id=await get_session_persist_id(event_session),
game_platform=GAME_TYPE,
command_type='rank',
command_args=[f'--detail {rank}'],
):
async with get_session() as session:
latest_data = (
await session.scalars(
select(TETRIOLeagueStats)
.order_by(TETRIOLeagueStats.id.desc())
.limit(1)
.options(selectinload(TETRIOLeagueStats.fields))
)
).one()
compare_data = (
await session.scalars(
select(TETRIOLeagueStats)
.order_by(
func.abs(
func.julianday(TETRIOLeagueStats.update_time)
- func.julianday(latest_data.update_time - timedelta(hours=24))
)
)
.limit(1)
.options(selectinload(TETRIOLeagueStats.fields))
)
).one()
await UniMessage.image(
raw=await make_image(
rank,
latest_data,
compare_data,
)
).finish()
async def make_image(rank: ValidRank, latest: TETRIOLeagueStats, compare: TETRIOLeagueStats) -> bytes:
latest_data = next(filter(lambda x: x.rank == rank, latest.fields))
compare_data = next(filter(lambda x: x.rank == rank, compare.fields))
avg = get_metrics(pps=latest_data.avg_pps, apm=latest_data.avg_apm, vs=latest_data.avg_vs)
low_pps = get_metrics(
pps=latest_data.low_pps.league.pps, apm=latest_data.low_pps.league.apm, vs=latest_data.low_pps.league.vs
)
low_apm = get_metrics(
pps=latest_data.low_apm.league.pps, apm=latest_data.low_apm.league.apm, vs=latest_data.low_apm.league.vs
)
low_vs = get_metrics(
pps=latest_data.low_vs.league.pps, apm=latest_data.low_vs.league.apm, vs=latest_data.low_vs.league.vs
)
max_pps = get_metrics(
pps=latest_data.high_pps.league.pps, apm=latest_data.high_pps.league.apm, vs=latest_data.high_pps.league.vs
)
max_apm = get_metrics(
pps=latest_data.high_apm.league.pps, apm=latest_data.high_apm.league.apm, vs=latest_data.high_apm.league.vs
)
max_vs = get_metrics(
pps=latest_data.high_vs.league.pps, apm=latest_data.high_vs.league.apm, vs=latest_data.high_vs.league.vs
)
async with HostPage(
await render(
'v2/tetrio/rank/detail',
Data(
name=latest_data.rank,
trending=round(latest_data.tr_line - compare_data.tr_line, 2),
require_tr=round(latest_data.tr_line, 2),
players=latest_data.player_count,
minimum_data=SpecialData(
apm=low_apm.apm,
pps=low_pps.pps,
lpm=low_pps.lpm,
vs=low_vs.vs,
adpm=low_vs.adpm,
apm_holder=latest_data.low_apm.username.upper(),
pps_holder=latest_data.low_pps.username.upper(),
vs_holder=latest_data.low_vs.username.upper(),
),
average_data=SpecialData(
apm=avg.apm, pps=avg.pps, lpm=avg.lpm, vs=avg.vs, adpm=avg.adpm, apl=avg.apl, adpl=avg.adpl
),
maximum_data=SpecialData(
apm=max_apm.apm,
pps=max_pps.pps,
lpm=max_pps.lpm,
vs=max_vs.vs,
adpm=max_vs.adpm,
apm_holder=latest_data.high_apm.username.upper(),
pps_holder=latest_data.high_pps.username.upper(),
vs_holder=latest_data.high_vs.username.upper(),
),
updated_at=latest.update_time.replace(tzinfo=UTC).astimezone(ZoneInfo('Asia/Shanghai')),
),
)
) as page_hash:
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')

View File

@@ -0,0 +1,33 @@
from asyncio import Lock, sleep
from collections.abc import Callable, Coroutine
from datetime import timedelta
from functools import wraps
from time import time
from typing import Any, ParamSpec, TypeVar
from nonebot.log import logger
P = ParamSpec('P')
T = TypeVar('T')
def limit(limit: timedelta) -> Callable[[Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]]]:
limit_seconds = limit.total_seconds()
def decorator(func: Callable[P, Coroutine[Any, Any, T]]) -> Callable[P, Coroutine[Any, Any, T]]:
last_call = 0.0
lock = Lock()
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
nonlocal last_call
async with lock:
if (diff := (time() - last_call)) < limit_seconds:
logger.debug(f'request limit {(limit_time:=limit_seconds-diff)}s')
await sleep(limit_time)
last_call = time()
return await func(*args, **kwargs)
return wrapper
return decorator

View File

@@ -117,6 +117,7 @@ class Request:
async def request(
self,
url: URL,
extra_headers: dict | None = None,
*,
is_json: bool = True,
enable_anti_cloudflare: bool = False,
@@ -128,6 +129,7 @@ class Request:
else:
cookies = None
headers = None
headers = headers if extra_headers is None else extra_headers if headers is None else headers | extra_headers
try:
async with AsyncClient(cookies=cookies, timeout=config.tetris.request_timeout, proxy=self.proxy) as session:
response = await session.get(str(url), headers=headers)