mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
Compare commits
99 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 6867245be3 | |||
|
|
eebff0a8ad | ||
| 74eef41506 | |||
| 5eb4771259 | |||
| 7a3a4d936d | |||
|
|
03ca7c4486 | ||
|
|
b043d1da59 | ||
|
|
c9659201b1 | ||
|
|
617d3ec658 | ||
|
|
57a1992675 | ||
|
|
8d1d2f329e | ||
|
|
fa6cbd5c6d | ||
|
|
9f0f0b87f4 | ||
|
|
96c298b1b8 | ||
|
|
df5ced235d | ||
|
|
af83c7a2d9 | ||
|
|
bc41a91034 | ||
|
|
d97291d1bc | ||
| 5b56de9de1 | |||
| 0898a81331 | |||
|
|
d464059c0a | ||
|
|
6ea8b9328c | ||
|
|
773ff5545c | ||
|
|
94710b938b | ||
| ec09bb734d | |||
|
|
9e9a642847 | ||
|
|
04e0b14e72 | ||
|
|
20ce9c64be | ||
|
|
8af07bf031 | ||
|
|
3a904f67ad | ||
|
|
fc9b751ac4 | ||
| cb4c6b96f0 | |||
|
|
25c3777c0f | ||
|
|
193fd1da2a | ||
|
|
2cd609dd40 | ||
| a206098805 | |||
|
|
d493ba5f0d | ||
|
|
581d1f9674 | ||
|
|
01c99e8a8c | ||
|
|
eb3f4bea04 | ||
|
|
ebbbd68b05 | ||
|
|
10e0eb815e | ||
|
|
a57b04e181 | ||
|
|
cc2e71f1a5 | ||
|
|
3384263bb2 | ||
|
|
68f210dc4f | ||
|
|
00a85fe3e9 | ||
|
|
a10a7584ae | ||
|
|
95aac5e321 | ||
| 89d8c938e2 | |||
| 84db42f1ce | |||
|
|
0a660922bb | ||
|
|
56bc98cc79 | ||
|
|
be61683b51 | ||
| ccd5706a95 | |||
| b69240caa5 | |||
| 49d00f4d0e | |||
| 389a850025 | |||
| 20dcc2bc3d | |||
| 606dddbca2 | |||
| f509b03cd0 | |||
| 6293d088db | |||
| 97e2abed78 | |||
|
|
5ea3fcb234 | ||
|
|
ca33ba1310 | ||
| 3629a2ff4a | |||
| a2108c9776 | |||
| 7133cd9384 | |||
|
|
406bc7674e | ||
|
|
259b38fda5 | ||
|
|
414345ae5c | ||
|
|
341cbd86cd | ||
|
|
bf7804738e | ||
|
|
553f373671 | ||
| e53e164a52 | |||
| 2cd7d89c3e | |||
| b8b6d5f6c8 | |||
| 7a44c0dca5 | |||
| 4155d8eb42 | |||
| 4cc942d226 | |||
| 996dd565d8 | |||
| 5b0660e45b | |||
| 8d1ebc06d1 | |||
| c57aa48048 | |||
| ad90562fdf | |||
| cbc96fc09e | |||
| 8e10cfe0d0 | |||
| d192f0506d | |||
| 44aed656b8 | |||
| feb662b980 | |||
| ed6eb9a5cf | |||
| 25e281a4c5 | |||
| a2d69b9113 | |||
| c8907a47a4 | |||
| 9fb176b4bc | |||
|
|
53740265b6 | ||
|
|
e6119074ce | ||
| f7a2e89274 | |||
| 3fe5a19c4a |
7
.github/workflows/TypeCheck.yml
vendored
7
.github/workflows/TypeCheck.yml
vendored
@@ -4,7 +4,7 @@ on:
|
||||
push:
|
||||
|
||||
jobs:
|
||||
Mypy:
|
||||
TypeCheck:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
@@ -25,3 +25,8 @@ jobs:
|
||||
shell: bash
|
||||
run: |
|
||||
poetry run mypy ./nonebot_plugin_tetris_stats
|
||||
|
||||
- name: Run BasedPyright
|
||||
shell: bash
|
||||
run: |
|
||||
poetry run basedpyright ./nonebot_plugin_tetris_stats/
|
||||
|
||||
@@ -7,13 +7,13 @@ ci:
|
||||
autoupdate_commit_msg: ':arrow_up: auto update by pre-commit hooks'
|
||||
repos:
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
rev: v0.5.7
|
||||
rev: v0.6.9
|
||||
hooks:
|
||||
- id: ruff
|
||||
args: [--fix, --exit-non-zero-on-fix]
|
||||
stages: [commit]
|
||||
stages: [pre-commit]
|
||||
- id: ruff-format
|
||||
stages: [commit]
|
||||
stages: [pre-commit]
|
||||
|
||||
- repo: https://github.com/nonebot/nonemoji
|
||||
rev: v0.1.4
|
||||
|
||||
@@ -1,13 +1,27 @@
|
||||
from pathlib import Path
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot_plugin_localstore import get_plugin_cache_dir, get_plugin_data_dir
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from nonebot_plugin_localstore import get_cache_dir
|
||||
from pydantic import BaseModel
|
||||
CACHE_PATH = get_plugin_cache_dir()
|
||||
DATA_PATH = get_plugin_data_dir()
|
||||
|
||||
CACHE_PATH: Path = get_cache_dir('nonebot_plugin_tetris_stats')
|
||||
|
||||
class Proxy(BaseModel):
|
||||
main: str | None = None
|
||||
github: str | None = None
|
||||
tetrio: str | None = None
|
||||
tos: str | None = None
|
||||
top: str | None = None
|
||||
|
||||
|
||||
class ScopedConfig(BaseModel):
|
||||
request_timeout: float = 30.0
|
||||
screenshot_quality: float = 2
|
||||
proxy: Proxy = Field(default_factory=Proxy)
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
"""配置类"""
|
||||
tetris: ScopedConfig = Field(default_factory=ScopedConfig)
|
||||
|
||||
tetris_req_timeout: float = 30.0
|
||||
tetris_screenshot_quality: float = 2
|
||||
|
||||
config = get_plugin_config(Config)
|
||||
|
||||
@@ -19,7 +19,6 @@ from sqlalchemy import desc, select
|
||||
from sqlalchemy.dialects import sqlite
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from sqlalchemy.orm import Session
|
||||
from ujson import dumps, loads
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
@@ -30,7 +29,9 @@ branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def migrate_old_data() -> None:
|
||||
def migrate_old_data() -> None: # noqa: C901
|
||||
from json import dumps, loads
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
Base.prepare(autoload_with=op.get_bind())
|
||||
OldHistoricalData = Base.classes.nonebot_plugin_tetris_stats_historicaldata # noqa: N806
|
||||
@@ -46,6 +47,9 @@ def migrate_old_data() -> None:
|
||||
TimeRemainingColumn(),
|
||||
) as progress,
|
||||
):
|
||||
if session.query(OldHistoricalData).count() == 0:
|
||||
logger.info('空表, 跳过')
|
||||
return
|
||||
task_id = progress.add_task('[cyan]Migrating:', total=session.query(OldHistoricalData).count())
|
||||
pointer = 0
|
||||
while pointer < session.query(OldHistoricalData).order_by(desc(OldHistoricalData.id)).limit(1).one().id:
|
||||
|
||||
@@ -0,0 +1,119 @@
|
||||
"""add TETRIOLeagueStats
|
||||
|
||||
迁移 ID: 5a1b93948494
|
||||
父迁移: cfeab6961dce
|
||||
创建时间: 2024-08-24 00:22:41.359500
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
revision: str = '5a1b93948494'
|
||||
down_revision: str | Sequence[str] | None = 'cfeab6961dce'
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
'nonebot_plugin_tetris_stats_tetrioleaguestats',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('update_time', sa.DateTime(), nullable=False),
|
||||
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetrioleaguestats')),
|
||||
info={'bind_key': 'nonebot_plugin_tetris_stats'},
|
||||
)
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestats', schema=None) as batch_op:
|
||||
batch_op.create_index(
|
||||
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestats_update_time'), ['update_time'], unique=False
|
||||
)
|
||||
|
||||
op.create_table(
|
||||
'nonebot_plugin_tetris_stats_tetrioleaguehistorical',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('request_id', sa.Uuid(), nullable=False),
|
||||
sa.Column('data', sa.JSON(), nullable=False),
|
||||
sa.Column('update_time', sa.DateTime(), nullable=False),
|
||||
sa.Column('stats_id', sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
['stats_id'],
|
||||
['nonebot_plugin_tetris_stats_tetrioleaguestats.id'],
|
||||
name=op.f(
|
||||
'fk_nonebot_plugin_tetris_stats_tetrioleaguehistorical_stats_id_nonebot_plugin_tetris_stats_tetrioleaguestats'
|
||||
),
|
||||
),
|
||||
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetrioleaguehistorical')),
|
||||
info={'bind_key': 'nonebot_plugin_tetris_stats'},
|
||||
)
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguehistorical', schema=None) as batch_op:
|
||||
batch_op.create_index(
|
||||
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_request_id'), ['request_id'], unique=False
|
||||
)
|
||||
batch_op.create_index(
|
||||
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_update_time'),
|
||||
['update_time'],
|
||||
unique=False,
|
||||
)
|
||||
|
||||
op.create_table(
|
||||
'nonebot_plugin_tetris_stats_tetrioleaguestatsfield',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('rank', sa.String(length=2), nullable=False),
|
||||
sa.Column('tr_line', sa.Float(), nullable=False),
|
||||
sa.Column('player_count', sa.Integer(), nullable=False),
|
||||
sa.Column('low_pps', sa.JSON(), nullable=False),
|
||||
sa.Column('low_apm', sa.JSON(), nullable=False),
|
||||
sa.Column('low_vs', sa.JSON(), nullable=False),
|
||||
sa.Column('avg_pps', sa.Float(), nullable=False),
|
||||
sa.Column('avg_apm', sa.Float(), nullable=False),
|
||||
sa.Column('avg_vs', sa.Float(), nullable=False),
|
||||
sa.Column('high_pps', sa.JSON(), nullable=False),
|
||||
sa.Column('high_apm', sa.JSON(), nullable=False),
|
||||
sa.Column('high_vs', sa.JSON(), nullable=False),
|
||||
sa.Column('stats_id', sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(
|
||||
['stats_id'],
|
||||
['nonebot_plugin_tetris_stats_tetrioleaguestats.id'],
|
||||
name=op.f(
|
||||
'fk_nonebot_plugin_tetris_stats_tetrioleaguestatsfield_stats_id_nonebot_plugin_tetris_stats_tetrioleaguestats'
|
||||
),
|
||||
),
|
||||
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetrioleaguestatsfield')),
|
||||
info={'bind_key': 'nonebot_plugin_tetris_stats'},
|
||||
)
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestatsfield', schema=None) as batch_op:
|
||||
batch_op.create_index(
|
||||
batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestatsfield_rank'), ['rank'], unique=False
|
||||
)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestatsfield', schema=None) as batch_op:
|
||||
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestatsfield_rank'))
|
||||
|
||||
op.drop_table('nonebot_plugin_tetris_stats_tetrioleaguestatsfield')
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguehistorical', schema=None) as batch_op:
|
||||
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_update_time'))
|
||||
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguehistorical_request_id'))
|
||||
|
||||
op.drop_table('nonebot_plugin_tetris_stats_tetrioleaguehistorical')
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetrioleaguestats', schema=None) as batch_op:
|
||||
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_tetrioleaguestats_update_time'))
|
||||
|
||||
op.drop_table('nonebot_plugin_tetris_stats_tetrioleaguestats')
|
||||
# ### end Alembic commands ###
|
||||
@@ -13,7 +13,6 @@ from typing import TYPE_CHECKING
|
||||
from alembic import op
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from sqlalchemy.orm import Session
|
||||
from ujson import dumps, loads
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
@@ -27,6 +26,7 @@ depends_on: str | Sequence[str] | None = None
|
||||
def upgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
from json import dumps, loads
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
connection = op.get_bind()
|
||||
@@ -50,6 +50,7 @@ def upgrade(name: str = '') -> None:
|
||||
def downgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
from json import dumps, loads
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
connection = op.get_bind()
|
||||
|
||||
@@ -28,12 +28,6 @@ depends_on: str | Sequence[str] | None = None
|
||||
def upgrade(name: str = '') -> None: # noqa: C901
|
||||
if name:
|
||||
return
|
||||
from nonebot_plugin_tetris_stats.version import __version__
|
||||
|
||||
if __version__ != '1.0.3':
|
||||
msg = '本迁移需要1.0.3版本, 请先锁定版本至1.0.3版本再执行本迁移'
|
||||
logger.critical(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
from nonebot.compat import PYDANTIC_V2, type_validate_json
|
||||
from pydantic import BaseModel, ValidationError
|
||||
@@ -46,10 +40,6 @@ def upgrade(name: str = '') -> None: # noqa: C901
|
||||
TimeRemainingColumn,
|
||||
)
|
||||
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import ( # type: ignore[import-untyped]
|
||||
BaseProcessedData,
|
||||
)
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
Base.prepare(autoload_with=op.get_bind())
|
||||
HistoricalData = Base.classes.nonebot_plugin_tetris_stats_historicaldata # noqa: N806
|
||||
@@ -62,18 +52,33 @@ def upgrade(name: str = '') -> None: # noqa: C901
|
||||
def model_to_json(value: BaseModel) -> str:
|
||||
return value.json(by_alias=True)
|
||||
|
||||
models = BaseProcessedData.__subclasses__()
|
||||
|
||||
def json_to_model(value: str) -> BaseModel:
|
||||
for i in models:
|
||||
try:
|
||||
return type_validate_json(i, value)
|
||||
except ValidationError: # noqa: PERF203
|
||||
...
|
||||
raise ValueError
|
||||
|
||||
with Session(op.get_bind()) as session:
|
||||
count = session.query(HistoricalData).count()
|
||||
if count == 0:
|
||||
logger.info('空表, 跳过')
|
||||
return
|
||||
|
||||
from nonebot_plugin_tetris_stats.version import __version__
|
||||
|
||||
if __version__ != '1.0.3':
|
||||
msg = '本迁移需要1.0.3版本, 请先锁定版本至1.0.3版本再执行本迁移'
|
||||
logger.critical(msg)
|
||||
raise RuntimeError(msg)
|
||||
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import ( # type: ignore[import-untyped]
|
||||
BaseProcessedData,
|
||||
)
|
||||
|
||||
models = BaseProcessedData.__subclasses__()
|
||||
|
||||
def json_to_model(value: str) -> BaseModel:
|
||||
for i in models:
|
||||
try:
|
||||
return type_validate_json(i, value)
|
||||
except ValidationError: # noqa: PERF203
|
||||
...
|
||||
raise ValueError
|
||||
|
||||
with Progress(
|
||||
TextColumn('[progress.description]{task.description}'),
|
||||
BarColumn(),
|
||||
|
||||
@@ -26,12 +26,7 @@ depends_on: str | Sequence[str] | None = None
|
||||
def upgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
from nonebot_plugin_tetris_stats.version import __version__
|
||||
|
||||
if __version__ != '1.0.4':
|
||||
msg = '本迁移需要1.0.4版本, 请先锁定版本至1.0.4版本再执行本迁移'
|
||||
logger.critical(msg)
|
||||
raise RuntimeError(msg)
|
||||
from nonebot.compat import type_validate_json
|
||||
from pydantic import ValidationError
|
||||
from rich.progress import (
|
||||
@@ -46,8 +41,6 @@ def upgrade(name: str = '') -> None:
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseUser # type: ignore[import-untyped]
|
||||
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('user_unique_identifier', sa.String(length=32), nullable=True))
|
||||
batch_op.create_index(
|
||||
@@ -60,37 +53,48 @@ def upgrade(name: str = '') -> None:
|
||||
Base.prepare(autoload_with=connection)
|
||||
HistoricalData = Base.classes.nonebot_plugin_tetris_stats_historicaldata # noqa: N806
|
||||
|
||||
models: list[type[BaseUser]] = BaseUser.__subclasses__()
|
||||
|
||||
def json_to_model(value: str) -> BaseUser:
|
||||
for i in models:
|
||||
try:
|
||||
return type_validate_json(i, value)
|
||||
except ValidationError: # noqa: PERF203
|
||||
...
|
||||
raise ValueError
|
||||
|
||||
with Session(op.get_bind()) as session:
|
||||
count = session.query(HistoricalData).count()
|
||||
with Progress(
|
||||
TextColumn('[progress.description]{task.description}'),
|
||||
BarColumn(),
|
||||
MofNCompleteColumn(),
|
||||
TaskProgressColumn(),
|
||||
TimeRemainingColumn(),
|
||||
) as progress:
|
||||
task_id = progress.add_task('[cyan]Updateing:', total=count)
|
||||
for i in range(0, count, 100):
|
||||
for j in session.scalars(
|
||||
select(HistoricalData).where(HistoricalData.id > i).order_by(HistoricalData.id).limit(100)
|
||||
):
|
||||
model = json_to_model(j.game_user)
|
||||
if count == 0:
|
||||
logger.info('空表, 跳过')
|
||||
else:
|
||||
from nonebot_plugin_tetris_stats.version import __version__
|
||||
|
||||
if __version__ != '1.0.4':
|
||||
msg = '本迁移需要1.0.4版本, 请先锁定版本至1.0.4版本再执行本迁移'
|
||||
logger.critical(msg)
|
||||
raise RuntimeError(msg)
|
||||
from nonebot_plugin_tetris_stats.game_data_processor.schemas import BaseUser # type: ignore[import-untyped]
|
||||
|
||||
models: list[type[BaseUser]] = BaseUser.__subclasses__()
|
||||
|
||||
def json_to_model(value: str) -> BaseUser:
|
||||
for i in models:
|
||||
try:
|
||||
j.user_unique_identifier = model.unique_identifier
|
||||
except ValueError:
|
||||
session.delete(j)
|
||||
progress.update(task_id, advance=1)
|
||||
session.commit()
|
||||
return type_validate_json(i, value)
|
||||
except ValidationError: # noqa: PERF203
|
||||
...
|
||||
raise ValueError
|
||||
|
||||
with Progress(
|
||||
TextColumn('[progress.description]{task.description}'),
|
||||
BarColumn(),
|
||||
MofNCompleteColumn(),
|
||||
TaskProgressColumn(),
|
||||
TimeRemainingColumn(),
|
||||
) as progress:
|
||||
task_id = progress.add_task('[cyan]Updateing:', total=count)
|
||||
for i in range(0, count, 100):
|
||||
for j in session.scalars(
|
||||
select(HistoricalData).where(HistoricalData.id > i).order_by(HistoricalData.id).limit(100)
|
||||
):
|
||||
model = json_to_model(j.game_user)
|
||||
try:
|
||||
j.user_unique_identifier = model.unique_identifier
|
||||
except ValueError:
|
||||
session.delete(j)
|
||||
progress.update(task_id, advance=1)
|
||||
session.commit()
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
|
||||
batch_op.alter_column('user_unique_identifier', existing_type=sa.VARCHAR(length=32), nullable=False)
|
||||
logger.success('database upgrade success')
|
||||
|
||||
@@ -23,7 +23,7 @@ command = Subcommand(
|
||||
)
|
||||
|
||||
|
||||
from . import bind, config, query, record # noqa: E402
|
||||
from . import bind, config, list, query, rank, record # noqa: E402
|
||||
|
||||
main_command.add(command)
|
||||
|
||||
@@ -31,6 +31,8 @@ __all__ = [
|
||||
'alc',
|
||||
'bind',
|
||||
'config',
|
||||
'list',
|
||||
'query',
|
||||
'rank',
|
||||
'record',
|
||||
]
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from .player import Player
|
||||
from .schemas.user import User
|
||||
from .schemas.user_info import UserInfoSuccess
|
||||
from .tetra_league import full_export as tetra_league_full_export
|
||||
|
||||
__all__ = ['Player', 'User', 'UserInfoSuccess', 'tetra_league_full_export']
|
||||
__all__ = ['Player', 'User', 'UserInfoSuccess']
|
||||
|
||||
@@ -6,25 +6,29 @@ from weakref import WeakValueDictionary
|
||||
from aiocache import Cache as ACache # type: ignore[import-untyped]
|
||||
from nonebot.compat import type_validate_json
|
||||
from nonebot.log import logger
|
||||
from yarl import URL
|
||||
|
||||
from ....config.config import config
|
||||
from ....utils.request import Request
|
||||
from .schemas.base import FailedModel, SuccessModel
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
request = Request(config.tetris.proxy.tetrio or config.tetris.proxy.main)
|
||||
|
||||
|
||||
class Cache:
|
||||
cache = ACache(ACache.MEMORY)
|
||||
task: ClassVar[WeakValueDictionary[str, Lock]] = WeakValueDictionary()
|
||||
task: ClassVar[WeakValueDictionary[URL, Lock]] = WeakValueDictionary()
|
||||
|
||||
@classmethod
|
||||
async def get(cls, url: str) -> bytes:
|
||||
async def get(cls, url: URL, extra_headers: dict | None = None) -> bytes:
|
||||
lock = cls.task.setdefault(url, Lock())
|
||||
async with lock:
|
||||
if (cached_data := await cls.cache.get(url)) is not None:
|
||||
logger.debug(f'{url}: Cache hit!')
|
||||
return cached_data
|
||||
response_data = await Request.request(url)
|
||||
response_data = await request.request(url, extra_headers, enable_anti_cloudflare=True)
|
||||
parsed_data: SuccessModel | FailedModel = type_validate_json(SuccessModel | FailedModel, response_data) # type: ignore[arg-type]
|
||||
if isinstance(parsed_data, SuccessModel):
|
||||
await cls.cache.add(
|
||||
|
||||
89
nonebot_plugin_tetris_stats/games/tetrio/api/leaderboards.py
Normal file
89
nonebot_plugin_tetris_stats/games/tetrio/api/leaderboards.py
Normal file
@@ -0,0 +1,89 @@
|
||||
from typing import Literal, overload
|
||||
from uuid import UUID
|
||||
|
||||
from nonebot.compat import type_validate_json
|
||||
from yarl import URL
|
||||
|
||||
from ....utils.exception import RequestError
|
||||
from ..constant import BASE_URL
|
||||
from .cache import Cache
|
||||
from .schemas.base import FailedModel
|
||||
from .schemas.leaderboards import Parameter
|
||||
from .schemas.leaderboards.by import By, BySuccessModel
|
||||
from .schemas.leaderboards.solo import Solo, SoloSuccessModel
|
||||
from .schemas.leaderboards.zenith import Zenith, ZenithSuccessModel
|
||||
|
||||
|
||||
async def by(
|
||||
by_type: Literal['league', 'xp', 'ar'], parameter: Parameter, x_session_id: UUID | None = None
|
||||
) -> BySuccessModel:
|
||||
model: By = type_validate_json(
|
||||
By, # type: ignore[arg-type]
|
||||
await get(
|
||||
BASE_URL / f'users/by/{by_type}',
|
||||
parameter,
|
||||
{'X-Session-ID': str(x_session_id)} if x_session_id is not None else None,
|
||||
),
|
||||
)
|
||||
if isinstance(model, FailedModel):
|
||||
msg = f'排行榜信息请求错误:\n{model.error}'
|
||||
raise RequestError(msg)
|
||||
return model
|
||||
|
||||
|
||||
@overload
|
||||
async def records(
|
||||
records_type: Literal['40l', 'blitz'],
|
||||
scope: str = '_global',
|
||||
revolution_id: str | None = None,
|
||||
*,
|
||||
parameter: Parameter,
|
||||
) -> SoloSuccessModel: ...
|
||||
|
||||
|
||||
@overload
|
||||
async def records(
|
||||
records_type: Literal['zenith', 'zenithex'],
|
||||
scope: str = '_global',
|
||||
revolution_id: str | None = None,
|
||||
*,
|
||||
parameter: Parameter,
|
||||
) -> ZenithSuccessModel: ...
|
||||
|
||||
|
||||
async def records(
|
||||
records_type: Literal['40l', 'blitz', 'zenith', 'zenithex'],
|
||||
scope: str = '_global',
|
||||
revolution_id: str | None = None,
|
||||
*,
|
||||
parameter: Parameter,
|
||||
) -> SoloSuccessModel | ZenithSuccessModel:
|
||||
model: Solo | Zenith
|
||||
match records_type:
|
||||
case '40l' | 'blitz':
|
||||
model = type_validate_json(
|
||||
Solo, # type: ignore[arg-type]
|
||||
await get(
|
||||
BASE_URL / 'records' / f'{records_type}{scope}{revolution_id if revolution_id is not None else ""}',
|
||||
parameter,
|
||||
),
|
||||
)
|
||||
case 'zenith' | 'zenithex':
|
||||
model = type_validate_json(
|
||||
Zenith, # type: ignore[arg-type]
|
||||
await get(
|
||||
BASE_URL / 'records' / f'{records_type}{scope}{revolution_id if revolution_id is not None else ""}',
|
||||
parameter,
|
||||
),
|
||||
)
|
||||
case _:
|
||||
msg = f'records_type: {records_type} is not supported'
|
||||
raise ValueError(msg)
|
||||
if isinstance(model, FailedModel):
|
||||
msg = f'排行榜信息请求错误:\n{model.error}' # type: ignore[attr-defined]
|
||||
raise RequestError(msg)
|
||||
return model
|
||||
|
||||
|
||||
async def get(url: URL, parameter: Parameter, extra_headers: dict | None = None) -> bytes:
|
||||
return await Cache.get(url % parameter.to_params(), extra_headers)
|
||||
@@ -7,7 +7,6 @@ from nonebot.compat import type_validate_json
|
||||
|
||||
from ....db import anti_duplicate_add
|
||||
from ....utils.exception import RequestError
|
||||
from ....utils.request import splice_url
|
||||
from ..constant import BASE_URL, USER_ID, USER_NAME
|
||||
from .cache import Cache
|
||||
from .models import TETRIOHistoricalData
|
||||
@@ -24,6 +23,7 @@ from .schemas.summaries import (
|
||||
SoloSuccessModel as SummariesSoloSuccessModel,
|
||||
)
|
||||
from .schemas.summaries.base import User as SummariesUser
|
||||
from .schemas.summaries.league import LeagueSuccessModel
|
||||
from .schemas.user import User
|
||||
from .schemas.user_info import UserInfo, UserInfoSuccess
|
||||
from .typing import Records, Summaries
|
||||
@@ -55,6 +55,7 @@ class Player:
|
||||
'blitz': SummariesSoloSuccessModel,
|
||||
'zenith': ZenithSuccessModel,
|
||||
'zenithex': ZenithSuccessModel,
|
||||
'league': LeagueSuccessModel,
|
||||
'zen': ZenSuccessModel,
|
||||
'achievements': AchievementsSuccessModel,
|
||||
}
|
||||
@@ -86,12 +87,7 @@ class Player:
|
||||
|
||||
@property
|
||||
def _request_user_parameter(self) -> str:
|
||||
if self.user_id is not None:
|
||||
return self.user_id
|
||||
if self.user_name is not None:
|
||||
return self.user_name.lower()
|
||||
msg = 'Invalid user'
|
||||
raise ValueError(msg)
|
||||
return self.user_id or cast(str, self.user_name).lower()
|
||||
|
||||
@property
|
||||
async def user(self) -> User:
|
||||
@@ -115,7 +111,7 @@ class Player:
|
||||
async def get_info(self) -> UserInfoSuccess:
|
||||
"""Get User Info"""
|
||||
if self._user_info is None:
|
||||
raw_user_info = await Cache.get(splice_url([BASE_URL, 'users/', f'{self._request_user_parameter}']))
|
||||
raw_user_info = await Cache.get(BASE_URL / 'users' / self._request_user_parameter)
|
||||
user_info: UserInfo = type_validate_json(UserInfo, raw_user_info) # type: ignore[arg-type]
|
||||
if isinstance(user_info, FailedModel):
|
||||
msg = f'用户信息请求错误:\n{user_info.error}'
|
||||
@@ -138,12 +134,14 @@ class Player:
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['zen']) -> ZenSuccessModel: ...
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['league']) -> LeagueSuccessModel: ...
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['achievements']) -> AchievementsSuccessModel: ...
|
||||
|
||||
async def get_summaries(self, summaries_type: Summaries) -> SummariesModel:
|
||||
if summaries_type not in self._summaries:
|
||||
raw_summaries = await Cache.get(
|
||||
splice_url([BASE_URL, 'users/', f'{self._request_user_parameter}/', 'summaries/', summaries_type])
|
||||
BASE_URL / 'users' / self._request_user_parameter / 'summaries' / summaries_type
|
||||
)
|
||||
summaries: SummariesModel | FailedModel = type_validate_json(
|
||||
self.__SUMMARIES_MAPPING[summaries_type] | FailedModel, # type: ignore[arg-type]
|
||||
@@ -164,20 +162,21 @@ class Player:
|
||||
return self._summaries[summaries_type]
|
||||
|
||||
@property
|
||||
@alru_cache
|
||||
async def sprint(self) -> SummariesSoloSuccessModel:
|
||||
return await self.get_summaries('40l')
|
||||
|
||||
@property
|
||||
@alru_cache
|
||||
async def blitz(self) -> SummariesSoloSuccessModel:
|
||||
return await self.get_summaries('blitz')
|
||||
|
||||
@property
|
||||
@alru_cache
|
||||
async def zen(self) -> ZenSuccessModel:
|
||||
return await self.get_summaries('zen')
|
||||
|
||||
@property
|
||||
async def league(self) -> LeagueSuccessModel:
|
||||
return await self.get_summaries('league')
|
||||
|
||||
async def _get_local_summaries_user(self) -> SummariesUser | None:
|
||||
allow_summaries: set[Literal['40l', 'blitz', 'zenith', 'zenithex']] = {
|
||||
'40l',
|
||||
@@ -212,16 +211,7 @@ class Player:
|
||||
async def get_records(self, mode_type: RecordModeType, records_type: RecordType) -> RecordsSoloSuccessModel:
|
||||
if (record_key := RecordKey(mode_type, records_type)) not in self._records:
|
||||
raw_records = await Cache.get(
|
||||
splice_url(
|
||||
[
|
||||
BASE_URL,
|
||||
'users/',
|
||||
f'{self._request_user_parameter}/',
|
||||
'records/',
|
||||
f'{mode_type}/',
|
||||
records_type,
|
||||
]
|
||||
)
|
||||
BASE_URL / 'users' / self._request_user_parameter / 'records' / mode_type / records_type,
|
||||
)
|
||||
records: RecordsSoloSuccessModel | FailedModel = type_validate_json(SoloRecord, raw_records) # type: ignore[arg-type]
|
||||
if isinstance(records, FailedModel):
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...typing import Prisecter
|
||||
|
||||
|
||||
class AggregateStats(BaseModel):
|
||||
@@ -39,11 +41,29 @@ class Garbage(BaseModel):
|
||||
cleared: int
|
||||
|
||||
|
||||
class P(BaseModel): # what is P
|
||||
class P(BaseModel):
|
||||
pri: float
|
||||
sec: float
|
||||
ter: float
|
||||
|
||||
def to_prisecter(self) -> Prisecter:
|
||||
return Prisecter(f'{self.pri}:{self.sec}:{self.ter}')
|
||||
|
||||
|
||||
class ArCounts(BaseModel):
|
||||
bronze: int | None = Field(default=None, alias='1')
|
||||
silver: int | None = Field(default=None, alias='2')
|
||||
gold: int | None = Field(default=None, alias='3')
|
||||
platinum: int | None = Field(default=None, alias='4')
|
||||
diamond: int | None = Field(default=None, alias='5')
|
||||
issued: int | None = Field(default=None, alias='100')
|
||||
top3: int | None = Field(default=None, alias='t3')
|
||||
top5: int | None = Field(default=None, alias='t5')
|
||||
top10: int | None = Field(default=None, alias='t10')
|
||||
top25: int | None = Field(default=None, alias='t25')
|
||||
top50: int | None = Field(default=None, alias='t50')
|
||||
top100: int | None = Field(default=None, alias='t100')
|
||||
|
||||
|
||||
class Cache(BaseModel):
|
||||
status: str
|
||||
|
||||
@@ -21,7 +21,7 @@ class Stats(BaseModel):
|
||||
level_lines: int
|
||||
level_lines_needed: int
|
||||
inputs: int
|
||||
holds: int
|
||||
holds: int = 0
|
||||
time: Time | None = None # ?: 不知道是之后都没有了还是还会有
|
||||
score: int
|
||||
zenlevel: int
|
||||
|
||||
@@ -0,0 +1,15 @@
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...typing import Prisecter
|
||||
|
||||
|
||||
class Parameter(BaseModel):
|
||||
after: Prisecter | None = None
|
||||
before: Prisecter | None = None
|
||||
limit: int = Field(default=25, ge=1, le=100)
|
||||
country: str | None = None
|
||||
|
||||
def to_params(self) -> dict[str, Any]:
|
||||
return self.model_dump(exclude_defaults=True)
|
||||
@@ -1,27 +0,0 @@
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ..base import SuccessModel
|
||||
from .base import Entry as BaseEntry
|
||||
|
||||
|
||||
class ArCounts(BaseModel):
|
||||
bronze: int | None = Field(None, alias='1')
|
||||
silver: int | None = Field(None, alias='2')
|
||||
gold: int | None = Field(None, alias='3')
|
||||
platinum: int | None = Field(None, alias='4')
|
||||
diamond: int | None = Field(None, alias='5')
|
||||
issued: int | None = Field(None, alias='100')
|
||||
top10: int | None = Field(None, alias='t10')
|
||||
|
||||
|
||||
class Entry(BaseEntry):
|
||||
ar: int
|
||||
ar_counts: ArCounts
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
entries: list[Entry]
|
||||
|
||||
|
||||
class ArSuccessModel(SuccessModel):
|
||||
data: Data
|
||||
@@ -1,30 +0,0 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...typing import Rank
|
||||
from ..base import P
|
||||
|
||||
|
||||
class League(BaseModel):
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
rating: int
|
||||
rank: Rank
|
||||
decaying: bool
|
||||
|
||||
|
||||
class Entry(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
username: str
|
||||
role: str
|
||||
xp: float
|
||||
league: League
|
||||
supporter: bool | None = None
|
||||
verified: bool
|
||||
country: str | None = None
|
||||
ts: datetime
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
gametime: float
|
||||
p: P
|
||||
@@ -0,0 +1,50 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...typing import Rank, ValidRank
|
||||
from ..base import ArCounts, FailedModel, P, SuccessModel
|
||||
|
||||
|
||||
class League(BaseModel):
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
tr: float
|
||||
gxe: float
|
||||
rank: Rank
|
||||
bestrank: ValidRank
|
||||
glicko: float
|
||||
rd: float
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
decaying: bool
|
||||
|
||||
|
||||
class Entry(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
username: str
|
||||
role: Literal['anon', 'user', 'bot', 'halfmod', 'mod', 'admin', 'sysop']
|
||||
ts: datetime | None = None
|
||||
xp: float
|
||||
country: str | None = None
|
||||
supporter: bool | None = None
|
||||
league: League
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
gametime: float
|
||||
ar: int
|
||||
ar_counts: ArCounts
|
||||
p: P
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
entries: list[Entry]
|
||||
|
||||
|
||||
class BySuccessModel(SuccessModel):
|
||||
data: Data
|
||||
|
||||
|
||||
By = BySuccessModel | FailedModel
|
||||
@@ -1,6 +1,6 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..base import SuccessModel
|
||||
from ..base import FailedModel, SuccessModel
|
||||
from ..summaries.solo import Record
|
||||
|
||||
|
||||
@@ -10,3 +10,6 @@ class Data(BaseModel):
|
||||
|
||||
class SoloSuccessModel(SuccessModel):
|
||||
data: Data
|
||||
|
||||
|
||||
Solo = SoloSuccessModel | FailedModel
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..base import SuccessModel
|
||||
from .base import Entry
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
entries: list[Entry]
|
||||
|
||||
|
||||
class XpSuccessModel(SuccessModel):
|
||||
data: Data
|
||||
@@ -1,6 +1,6 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..base import SuccessModel
|
||||
from ..base import FailedModel, SuccessModel
|
||||
from ..summaries.zenith import Record
|
||||
|
||||
|
||||
@@ -10,3 +10,6 @@ class Data(BaseModel):
|
||||
|
||||
class ZenithSuccessModel(SuccessModel):
|
||||
data: Data
|
||||
|
||||
|
||||
Zenith = ZenithSuccessModel | FailedModel
|
||||
|
||||
@@ -1,19 +1,21 @@
|
||||
from .achievements import Achievements, AchievementsSuccessModel
|
||||
from .league import LeagueSuccessModel
|
||||
from .solo import Solo, SoloSuccessModel
|
||||
from .zen import Zen, ZenSuccessModel
|
||||
from .zenith import Zenith, ZenithEx, ZenithSuccessModel
|
||||
|
||||
SummariesModel = AchievementsSuccessModel | SoloSuccessModel | ZenSuccessModel | ZenithSuccessModel
|
||||
SummariesModel = AchievementsSuccessModel | SoloSuccessModel | ZenSuccessModel | LeagueSuccessModel | ZenithSuccessModel
|
||||
|
||||
__all__ = [
|
||||
'Achievements',
|
||||
'AchievementsSuccessModel',
|
||||
'LeagueSuccessModel',
|
||||
'Solo',
|
||||
'SoloSuccessModel',
|
||||
'SummariesModel',
|
||||
'Zen',
|
||||
'ZenSuccessModel',
|
||||
'Zenith',
|
||||
'ZenithEx',
|
||||
'ZenithSuccessModel',
|
||||
'SummariesModel',
|
||||
'ZenSuccessModel',
|
||||
]
|
||||
|
||||
@@ -7,5 +7,4 @@ class User(BaseModel):
|
||||
avatar_revision: int | None
|
||||
banner_revision: int | None
|
||||
country: str | None
|
||||
verified: int
|
||||
supporter: int
|
||||
|
||||
@@ -0,0 +1,120 @@
|
||||
from functools import partial
|
||||
from typing import Literal
|
||||
|
||||
from nonebot.compat import PYDANTIC_V2
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...typing import Rank, S1Rank, S1ValidRank
|
||||
from ..base import SuccessModel
|
||||
|
||||
if PYDANTIC_V2:
|
||||
from pydantic import field_validator
|
||||
|
||||
custom_validator = partial(field_validator, mode='before')
|
||||
else:
|
||||
from pydantic import validator
|
||||
|
||||
custom_validator = partial(validator, pre=True, always=True) # type: ignore[assignment, arg-type]
|
||||
|
||||
|
||||
class PastInner(BaseModel):
|
||||
season: str
|
||||
username: str
|
||||
country: str | None = None
|
||||
placement: int | None = None
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
glicko: float
|
||||
gxe: float
|
||||
tr: float
|
||||
rd: float
|
||||
rank: S1Rank
|
||||
bestrank: S1ValidRank
|
||||
ranked: bool
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
|
||||
|
||||
class Past(BaseModel):
|
||||
first: PastInner | None = Field(default=None, alias='1')
|
||||
|
||||
|
||||
class BaseData(BaseModel):
|
||||
decaying: bool
|
||||
past: Past
|
||||
|
||||
|
||||
class NeverPlayedData(BaseData):
|
||||
gamesplayed: Literal[0]
|
||||
gameswon: Literal[0]
|
||||
glicko: Literal[-1]
|
||||
rd: Literal[-1]
|
||||
gxe: Literal[-1]
|
||||
tr: Literal[-1]
|
||||
rank: Literal['z']
|
||||
apm: None = None
|
||||
pps: None = None
|
||||
vs: None = None
|
||||
standing: Literal[-1]
|
||||
standing_local: Literal[-1]
|
||||
prev_rank: None
|
||||
prev_at: Literal[-1]
|
||||
next_rank: None
|
||||
next_at: Literal[-1]
|
||||
percentile: Literal[-1]
|
||||
percentile_rank: Literal['z']
|
||||
|
||||
|
||||
class NeverRatedData(BaseData):
|
||||
gamesplayed: Literal[1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||
gameswon: int
|
||||
glicko: Literal[-1]
|
||||
rd: Literal[-1]
|
||||
gxe: Literal[-1]
|
||||
tr: Literal[-1]
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
rank: Literal['z']
|
||||
standing: Literal[-1]
|
||||
standing_local: Literal[-1]
|
||||
prev_rank: None
|
||||
prev_at: Literal[-1]
|
||||
next_rank: None
|
||||
next_at: Literal[-1]
|
||||
percentile: Literal[-1]
|
||||
percentile_rank: Literal['z']
|
||||
|
||||
@custom_validator('apm', 'pps', 'vs')
|
||||
@classmethod
|
||||
def _(cls, value: float | None) -> float:
|
||||
if value is None:
|
||||
return 0
|
||||
return value
|
||||
|
||||
|
||||
class RatedData(BaseData):
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
glicko: float
|
||||
rd: float
|
||||
gxe: float
|
||||
tr: float
|
||||
rank: Rank
|
||||
bestrank: Rank
|
||||
standing: int
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
standing_local: int
|
||||
prev_rank: Rank | None = None
|
||||
prev_at: int
|
||||
next_rank: Rank | None = None
|
||||
next_at: int
|
||||
percentile: float
|
||||
percentile_rank: str
|
||||
|
||||
|
||||
class LeagueSuccessModel(SuccessModel):
|
||||
data: NeverPlayedData | NeverRatedData | RatedData
|
||||
@@ -1,59 +0,0 @@
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ..typing import Rank
|
||||
from .base import FailedModel
|
||||
from .base import SuccessModel as BaseSuccessModel
|
||||
|
||||
|
||||
class _User(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
username: str
|
||||
role: str
|
||||
xp: float
|
||||
supporter: bool | None = None
|
||||
verified: bool
|
||||
country: str | None = None
|
||||
|
||||
|
||||
class _League(BaseModel):
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
rating: float
|
||||
rank: Rank
|
||||
bestrank: Rank
|
||||
decaying: bool
|
||||
|
||||
|
||||
class ValidLeague(_League):
|
||||
glicko: float
|
||||
rd: float
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
|
||||
|
||||
class ValidUser(_User):
|
||||
league: ValidLeague
|
||||
|
||||
|
||||
class InvalidLeague(_League):
|
||||
glicko: float | None = None
|
||||
rd: float | None = None
|
||||
apm: float | None = None
|
||||
pps: float | None = None
|
||||
vs: float | None = None
|
||||
|
||||
|
||||
class InvalidUser(_User):
|
||||
league: InvalidLeague
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
users: list[ValidUser | InvalidUser]
|
||||
|
||||
|
||||
class TetraLeagueSuccess(BaseSuccessModel):
|
||||
data: Data
|
||||
|
||||
|
||||
TetraLeague = TetraLeagueSuccess | FailedModel
|
||||
@@ -3,7 +3,7 @@ from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from .base import FailedModel
|
||||
from .base import ArCounts, FailedModel
|
||||
from .base import SuccessModel as BaseSuccessModel
|
||||
|
||||
|
||||
@@ -14,13 +14,19 @@ class Badge(BaseModel):
|
||||
ts: datetime | Literal[False] | None = None
|
||||
|
||||
|
||||
class Discord(BaseModel):
|
||||
class Connection(BaseModel):
|
||||
id: str
|
||||
username: str
|
||||
display_username: str
|
||||
|
||||
|
||||
class Connections(BaseModel):
|
||||
discord: Discord | None = None
|
||||
discord: Connection | None = None
|
||||
twitch: Connection | None = None
|
||||
twitter: Connection | None = None
|
||||
reddit: Connection | None = None
|
||||
youtube: Connection | None = None
|
||||
steam: Connection | None = None
|
||||
|
||||
|
||||
class Distinguishment(BaseModel):
|
||||
@@ -28,9 +34,9 @@ class Distinguishment(BaseModel):
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
id: str = Field(default=..., alias='_id')
|
||||
username: str
|
||||
role: Literal['anon', 'user', 'bot', 'halfmod', 'mod', 'admin', 'sysop', 'banned']
|
||||
role: Literal['anon', 'user', 'bot', 'halfmod', 'mod', 'admin', 'sysop', 'hidden', 'banned']
|
||||
ts: datetime | None = None
|
||||
botmaster: str | None = None
|
||||
badges: list[Badge]
|
||||
@@ -42,7 +48,6 @@ class Data(BaseModel):
|
||||
badstanding: bool | None = None
|
||||
supporter: bool | None = None # osk说是必有, 但实际上不是 fkosk
|
||||
supporter_tier: int
|
||||
verified: bool
|
||||
avatar_revision: int | None = None
|
||||
"""This user's avatar ID. Get their avatar at
|
||||
|
||||
@@ -57,6 +62,9 @@ class Data(BaseModel):
|
||||
connections: Connections
|
||||
friend_count: int | None = None
|
||||
distinguishment: Distinguishment | None = None
|
||||
achievements: list[int]
|
||||
ar: int
|
||||
ar_counts: ArCounts
|
||||
|
||||
|
||||
class UserInfoSuccess(BaseSuccessModel):
|
||||
|
||||
@@ -1,55 +0,0 @@
|
||||
from typing import Literal, NamedTuple, TypedDict, overload
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from nonebot.compat import type_validate_json
|
||||
|
||||
from ....utils.exception import RequestError
|
||||
from ....utils.request import splice_url
|
||||
from ..constant import BASE_URL
|
||||
from .cache import Cache
|
||||
from .schemas.base import FailedModel
|
||||
from .schemas.tetra_league import TetraLeague, TetraLeagueSuccess
|
||||
|
||||
|
||||
class Parameter(TypedDict, total=False):
|
||||
after: float
|
||||
before: float
|
||||
limit: int
|
||||
country: str
|
||||
|
||||
|
||||
async def leaderboard(parameter: Parameter | None = None) -> TetraLeagueSuccess:
|
||||
league: TetraLeague = type_validate_json(
|
||||
TetraLeague, # type: ignore[arg-type]
|
||||
(await Cache.get(splice_url([BASE_URL, 'users/lists/league', f'?{urlencode(parameter or {})}']))),
|
||||
)
|
||||
if isinstance(league, FailedModel):
|
||||
msg = f'排行榜数据请求错误:\n{league.error}'
|
||||
raise RequestError(msg)
|
||||
return league
|
||||
|
||||
|
||||
class FullExport(NamedTuple):
|
||||
model: TetraLeagueSuccess
|
||||
original: bytes
|
||||
|
||||
|
||||
@overload
|
||||
async def full_export(*, with_original: Literal[False]) -> TetraLeagueSuccess: ...
|
||||
|
||||
|
||||
@overload
|
||||
async def full_export(*, with_original: Literal[True]) -> FullExport: ...
|
||||
|
||||
|
||||
async def full_export(*, with_original: bool) -> TetraLeagueSuccess | FullExport:
|
||||
full: TetraLeague = type_validate_json(
|
||||
TetraLeague, # type: ignore[arg-type]
|
||||
(data := await Cache.get(splice_url([BASE_URL, 'users/lists/league/all']))),
|
||||
)
|
||||
if isinstance(full, FailedModel):
|
||||
msg = f'排行榜数据请求错误:\n{full.error}'
|
||||
raise RequestError(msg)
|
||||
if with_original:
|
||||
return FullExport(full, data)
|
||||
return full
|
||||
@@ -1,6 +1,7 @@
|
||||
from typing import Literal
|
||||
from typing import Literal, NewType
|
||||
|
||||
ValidRank = Literal[
|
||||
S1ValidRank = Literal[
|
||||
'x+',
|
||||
'x',
|
||||
'u',
|
||||
'ss',
|
||||
@@ -19,7 +20,9 @@ ValidRank = Literal[
|
||||
'd+',
|
||||
'd',
|
||||
]
|
||||
S1Rank = S1ValidRank | Literal['z']
|
||||
|
||||
ValidRank = Literal['x+'] | S1ValidRank
|
||||
Rank = ValidRank | Literal['z'] # 未定级
|
||||
|
||||
Summaries = Literal[
|
||||
@@ -27,7 +30,7 @@ Summaries = Literal[
|
||||
'blitz',
|
||||
'zenith',
|
||||
'zenithex',
|
||||
# 'league', # 等待正式赛季开始
|
||||
'league',
|
||||
'zen',
|
||||
'achievements',
|
||||
]
|
||||
@@ -40,3 +43,5 @@ Records = Literal[
|
||||
'blitz_recent',
|
||||
'blitz_progression',
|
||||
]
|
||||
|
||||
Prisecter = NewType('Prisecter', str)
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
from hashlib import md5
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from arclet.alconna import Arg, ArgFlag
|
||||
from nonebot_plugin_alconna import Args, Subcommand
|
||||
@@ -9,6 +8,7 @@ from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import User
|
||||
from nonebot_plugin_userinfo import BotUserInfo, UserInfo
|
||||
from yarl import URL
|
||||
|
||||
from ...db import BindStatus, create_or_update_bind, trigger
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
@@ -67,7 +67,10 @@ async def _(nb_user: User, account: Player, event_session: EventSession, bot_inf
|
||||
platform='TETR.IO',
|
||||
status='unknown',
|
||||
user=People(
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}')
|
||||
% {'revision': avatar_revision}
|
||||
)
|
||||
if (avatar_revision := (await account.avatar_revision)) is not None and avatar_revision != 0
|
||||
else Avatar(type='identicon', hash=md5(user.ID.encode()).hexdigest()), # noqa: S324
|
||||
name=user.name.upper(),
|
||||
|
||||
@@ -1,13 +1,16 @@
|
||||
from re import compile
|
||||
from typing import Literal
|
||||
|
||||
from yarl import URL
|
||||
|
||||
from .api.typing import ValidRank
|
||||
|
||||
GAME_TYPE: Literal['IO'] = 'IO'
|
||||
|
||||
BASE_URL = 'https://ch.tetr.io/api/'
|
||||
BASE_URL = URL('https://ch.tetr.io/api/')
|
||||
|
||||
RANK_PERCENTILE: dict[ValidRank, float] = {
|
||||
'x+': 0.2,
|
||||
'x': 1,
|
||||
'u': 5,
|
||||
'ss': 11,
|
||||
|
||||
91
nonebot_plugin_tetris_stats/games/tetrio/list.py
Normal file
91
nonebot_plugin_tetris_stats/games/tetrio/list.py
Normal file
@@ -0,0 +1,91 @@
|
||||
from nonebot_plugin_alconna import Args, Option, Subcommand
|
||||
from nonebot_plugin_alconna.uniseg import UniMessage
|
||||
from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
|
||||
from ...db import trigger
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.metrics import get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.schemas.tetrio.user.list_v2 import List, TetraLeague, User
|
||||
from ...utils.screenshot import screenshot
|
||||
from .. import alc
|
||||
from . import command
|
||||
from .api.leaderboards import by
|
||||
from .api.schemas.base import P
|
||||
from .api.schemas.leaderboards import Parameter
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
command.add(
|
||||
Subcommand(
|
||||
'list',
|
||||
Option('--max-tr', Args['max_tr', float], help_text='TR的上限'),
|
||||
Option('--min-tr', Args['min_tr', float], help_text='TR的下限'),
|
||||
Option('--limit', Args['limit', int], help_text='查询数量'),
|
||||
Option('--country', Args['country', str], help_text='国家代码'),
|
||||
help_text='查询 TETR.IO 段位排行榜',
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@alc.assign('TETRIO.list')
|
||||
async def _(
|
||||
event_session: EventSession,
|
||||
max_tr: float | None = None,
|
||||
min_tr: float | None = None,
|
||||
limit: int | None = None,
|
||||
country: str | None = None,
|
||||
):
|
||||
country = country.upper() if country is not None else None
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='list',
|
||||
command_args=[
|
||||
f'{key} {value}'
|
||||
for key, value in zip(
|
||||
('--max-tr', '--min-tr', '--limit', '--country'), (max_tr, min_tr, limit, country), strict=True
|
||||
)
|
||||
if value is not None
|
||||
],
|
||||
):
|
||||
parameter = Parameter(
|
||||
# ?: 似乎是只需要 pri 至少 league 榜的返回值只有 pri
|
||||
after=P(pri=max_tr, sec=0, ter=0).to_prisecter() if max_tr is not None else None,
|
||||
before=P(pri=min_tr, sec=0, ter=0).to_prisecter() if min_tr is not None else None,
|
||||
limit=limit or 25,
|
||||
country=country,
|
||||
)
|
||||
league = await by('league', parameter)
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v2/tetrio/user/list',
|
||||
List(
|
||||
show_index=True,
|
||||
users=[
|
||||
User(
|
||||
id=i.id,
|
||||
name=i.username.upper(),
|
||||
avatar=f'https://tetr.io/user-content/avatars/{i.id}.jpg',
|
||||
country=i.country,
|
||||
tetra_league=TetraLeague(
|
||||
rank=i.league.rank,
|
||||
tr=round(i.league.tr, 2),
|
||||
glicko=round(i.league.glicko, 2),
|
||||
rd=round(i.league.rd, 2),
|
||||
decaying=i.league.decaying,
|
||||
pps=(metrics := get_metrics(pps=i.league.pps, apm=i.league.apm, vs=i.league.vs)).pps,
|
||||
apm=metrics.apm,
|
||||
apl=metrics.apl,
|
||||
vs=metrics.vs,
|
||||
adpl=metrics.adpl,
|
||||
),
|
||||
xp=i.xp,
|
||||
join_at=None,
|
||||
)
|
||||
for i in league.data.entries
|
||||
],
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
await UniMessage.image(raw=await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')).finish()
|
||||
@@ -1,10 +1,53 @@
|
||||
from nonebot_plugin_orm import Model
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
|
||||
from datetime import datetime
|
||||
from uuid import UUID
|
||||
|
||||
from nonebot_plugin_orm import Model
|
||||
from sqlalchemy import DateTime, ForeignKey, String
|
||||
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column, relationship
|
||||
|
||||
from ...db.models import PydanticType
|
||||
from .api.schemas.leaderboards.by import BySuccessModel, Entry
|
||||
from .api.typing import ValidRank
|
||||
from .typing import Template
|
||||
|
||||
|
||||
class TETRIOUserConfig(MappedAsDataclass, Model):
|
||||
id: Mapped[int] = mapped_column(primary_key=True)
|
||||
query_template: Mapped[Template] = mapped_column(String(2))
|
||||
|
||||
|
||||
class TETRIOLeagueStats(MappedAsDataclass, Model):
|
||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||
raw: Mapped[list['TETRIOLeagueHistorical']] = relationship(back_populates='stats', lazy='noload')
|
||||
fields: Mapped[list['TETRIOLeagueStatsField']] = relationship(back_populates='stats')
|
||||
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)
|
||||
|
||||
|
||||
class TETRIOLeagueHistorical(MappedAsDataclass, Model):
|
||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||
request_id: Mapped[UUID] = mapped_column(index=True)
|
||||
data: Mapped[BySuccessModel] = mapped_column(PydanticType([], {BySuccessModel}))
|
||||
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)
|
||||
stats_id: Mapped[int] = mapped_column(ForeignKey('nonebot_plugin_tetris_stats_tetrioleaguestats.id'), init=False)
|
||||
stats: Mapped['TETRIOLeagueStats'] = relationship(back_populates='raw')
|
||||
|
||||
|
||||
entry_type = PydanticType([], {Entry})
|
||||
|
||||
|
||||
class TETRIOLeagueStatsField(MappedAsDataclass, Model):
|
||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||
rank: Mapped[ValidRank] = mapped_column(String(2), index=True)
|
||||
tr_line: Mapped[float]
|
||||
player_count: Mapped[int]
|
||||
low_pps: Mapped[Entry] = mapped_column(entry_type)
|
||||
low_apm: Mapped[Entry] = mapped_column(entry_type)
|
||||
low_vs: Mapped[Entry] = mapped_column(entry_type)
|
||||
avg_pps: Mapped[float]
|
||||
avg_apm: Mapped[float]
|
||||
avg_vs: Mapped[float]
|
||||
high_pps: Mapped[Entry] = mapped_column(entry_type)
|
||||
high_apm: Mapped[Entry] = mapped_column(entry_type)
|
||||
high_vs: Mapped[Entry] = mapped_column(entry_type)
|
||||
stats_id: Mapped[int] = mapped_column(ForeignKey('nonebot_plugin_tetris_stats_tetrioleaguestats.id'), init=False)
|
||||
stats: Mapped['TETRIOLeagueStats'] = relationship(back_populates='fields')
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
from asyncio import gather
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from hashlib import md5
|
||||
from typing import TYPE_CHECKING, TypeVar
|
||||
from urllib.parse import urlencode
|
||||
from typing import TypeVar
|
||||
|
||||
from arclet.alconna import Arg, ArgFlag
|
||||
from nonebot import get_driver
|
||||
@@ -16,12 +15,22 @@ from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[im
|
||||
from nonebot_plugin_user import User as NBUser
|
||||
from nonebot_plugin_user import get_user
|
||||
from sqlalchemy import select
|
||||
from yarl import URL
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.metrics import get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.schemas.base import Avatar
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import Badge, Blitz, Sprint, Statistic, Zen
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import (
|
||||
Badge,
|
||||
Blitz,
|
||||
Sprint,
|
||||
Statistic,
|
||||
TetraLeague,
|
||||
TetraLeagueStatistic,
|
||||
Zen,
|
||||
)
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import Info as V2TemplateInfo
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import User as V2TemplateUser
|
||||
from ...utils.screenshot import screenshot
|
||||
@@ -30,15 +39,11 @@ from .. import add_block_handlers, alc
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import command, get_player
|
||||
from .api import Player
|
||||
from .api.schemas.summaries.league import NeverPlayedData, NeverRatedData
|
||||
from .constant import GAME_TYPE
|
||||
from .models import TETRIOUserConfig
|
||||
from .typing import Template
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .api.schemas.summaries import SoloSuccessModel, ZenSuccessModel
|
||||
from .api.schemas.user import User
|
||||
from .api.schemas.user_info import UserInfoSuccess
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
driver = get_driver()
|
||||
@@ -99,7 +104,7 @@ async def _( # noqa: PLR0913
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[f'--default-template {template}'] if template is not None else [],
|
||||
command_args=[f'--template {template}'] if template is not None else [],
|
||||
):
|
||||
async with get_session() as session:
|
||||
bind = await query_bind_info(
|
||||
@@ -126,7 +131,7 @@ async def _(user: NBUser, account: Player, event_session: EventSession, template
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='query',
|
||||
command_args=[f'--default-template {template}'] if template is not None else [],
|
||||
command_args=[f'--template {template}'] if template is not None else [],
|
||||
):
|
||||
async with get_session() as session:
|
||||
if template is None:
|
||||
@@ -144,24 +149,13 @@ def handling_special_value(value: N) -> N | None:
|
||||
|
||||
|
||||
async def make_query_image_v2(player: Player) -> bytes:
|
||||
user: User
|
||||
user_info: UserInfoSuccess
|
||||
sprint: SoloSuccessModel
|
||||
blitz: SoloSuccessModel
|
||||
zen: ZenSuccessModel
|
||||
avatar_revision: int | None
|
||||
banner_revision: int | None
|
||||
# TODO)) 有没有什么办法能让这类型推导成功)
|
||||
user, user_info, sprint, blitz, zen, avatar_revision, banner_revision = await gather( # type: ignore[assignment]
|
||||
player.user,
|
||||
player.get_info(),
|
||||
player.sprint,
|
||||
player.blitz,
|
||||
player.zen,
|
||||
player.avatar_revision,
|
||||
player.banner_revision,
|
||||
(
|
||||
(user, user_info, league, sprint, blitz, zen),
|
||||
(avatar_revision, banner_revision),
|
||||
) = await gather(
|
||||
gather(player.user, player.get_info(), player.league, player.sprint, player.blitz, player.zen),
|
||||
gather(player.avatar_revision, player.banner_revision),
|
||||
)
|
||||
|
||||
if sprint.data.record is not None:
|
||||
duration = timedelta(milliseconds=sprint.data.record.results.stats.finaltime).total_seconds()
|
||||
sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004
|
||||
@@ -187,10 +181,14 @@ async def make_query_image_v2(player: Player) -> bytes:
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
bio=user_info.data.bio,
|
||||
banner=f'http://{netloc}/host/resource/tetrio/banners/{user.ID}?{urlencode({"revision": banner_revision})}'
|
||||
banner=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/banners/{user.ID}') % {'revision': banner_revision}
|
||||
)
|
||||
if banner_revision is not None and banner_revision != 0
|
||||
else None,
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}') % {'revision': avatar_revision}
|
||||
)
|
||||
if avatar_revision is not None and avatar_revision != 0
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
@@ -211,11 +209,28 @@ async def make_query_image_v2(player: Player) -> bytes:
|
||||
friend_count=user_info.data.friend_count,
|
||||
supporter_tier=user_info.data.supporter_tier,
|
||||
bad_standing=user_info.data.badstanding or False,
|
||||
verified=user_info.data.verified,
|
||||
playtime=play_time,
|
||||
join_at=user_info.data.ts,
|
||||
),
|
||||
tetra_league=None,
|
||||
tetra_league=TetraLeague(
|
||||
rank=league.data.rank,
|
||||
highest_rank='z' if isinstance(league.data, NeverRatedData) else league.data.bestrank,
|
||||
tr=round(league.data.tr, 2),
|
||||
glicko=round(league.data.glicko, 2),
|
||||
rd=round(league.data.rd, 2),
|
||||
global_rank=league.data.standing,
|
||||
country_rank=league.data.standing_local,
|
||||
pps=(metrics := get_metrics(pps=league.data.pps, apm=league.data.apm, vs=league.data.vs)).pps,
|
||||
apm=metrics.apm,
|
||||
apl=metrics.apl,
|
||||
vs=metrics.vs,
|
||||
adpl=metrics.adpl,
|
||||
statistic=TetraLeagueStatistic(total=league.data.gamesplayed, wins=league.data.gameswon),
|
||||
decaying=league.data.decaying,
|
||||
history=None,
|
||||
)
|
||||
if not isinstance(league.data, NeverPlayedData)
|
||||
else None,
|
||||
statistic=Statistic(
|
||||
total=handling_special_value(user_info.data.gamesplayed),
|
||||
wins=handling_special_value(user_info.data.gameswon),
|
||||
|
||||
154
nonebot_plugin_tetris_stats/games/tetrio/rank/__init__.py
Normal file
154
nonebot_plugin_tetris_stats/games/tetrio/rank/__init__.py
Normal file
@@ -0,0 +1,154 @@
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable, Sequence
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from math import floor
|
||||
from statistics import mean
|
||||
from typing import TYPE_CHECKING
|
||||
from uuid import uuid4
|
||||
|
||||
from nonebot import get_driver
|
||||
from nonebot_plugin_alconna import Subcommand
|
||||
from nonebot_plugin_apscheduler import scheduler
|
||||
from nonebot_plugin_orm import get_session
|
||||
from sqlalchemy import select
|
||||
|
||||
from ....utils.exception import RequestError
|
||||
from ....utils.limit import limit
|
||||
from ....utils.retry import retry
|
||||
from .. import alc
|
||||
from .. import command as base_command
|
||||
from ..api.leaderboards import by
|
||||
from ..api.schemas.base import P
|
||||
from ..api.schemas.leaderboards import Parameter
|
||||
from ..api.schemas.leaderboards.by import Entry
|
||||
from ..constant import RANK_PERCENTILE
|
||||
from ..models import TETRIOLeagueHistorical, TETRIOLeagueStats, TETRIOLeagueStatsField
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..api.schemas.leaderboards.by import BySuccessModel
|
||||
from ..api.typing import Rank
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
command = Subcommand('rank', help_text='查询 TETR.IO 段位信息')
|
||||
|
||||
|
||||
def wrapper(slot: int | str, content: str | None) -> str | None:
|
||||
if slot == 'rank' and not content:
|
||||
return '--all'
|
||||
if content is not None:
|
||||
return f'--detail {content.lower()}'
|
||||
return content
|
||||
|
||||
|
||||
alc.shortcut(
|
||||
r'(?i:io)(?i:段位|段|rank)\s*(?P<rank>[a-zA-Z+-]{0,2})',
|
||||
command='tstats TETR.IO rank {rank}',
|
||||
humanized='iorank',
|
||||
fuzzy=False,
|
||||
wrapper=wrapper,
|
||||
)
|
||||
|
||||
|
||||
def _pps(user: Entry) -> float:
|
||||
return user.league.pps
|
||||
|
||||
|
||||
def _apm(user: Entry) -> float:
|
||||
return user.league.apm
|
||||
|
||||
|
||||
def _vs(user: Entry) -> float:
|
||||
return user.league.vs
|
||||
|
||||
|
||||
def _min(users: Sequence[Entry], field: Callable[[Entry], float]) -> Entry:
|
||||
return min(users, key=field)
|
||||
|
||||
|
||||
def _max(users: Sequence[Entry], field: Callable[[Entry], float]) -> Entry:
|
||||
return max(users, key=field)
|
||||
|
||||
|
||||
def find_special_player(
|
||||
users: Sequence[Entry],
|
||||
field: Callable[[Entry], float],
|
||||
sort: Callable[[Sequence[Entry], Callable[[Entry], float]], Entry],
|
||||
) -> Entry:
|
||||
return sort(users, field)
|
||||
|
||||
|
||||
@scheduler.scheduled_job('cron', hour='0,6,12,18', minute=0)
|
||||
async def get_tetra_league_data() -> None:
|
||||
x_session_id = uuid4()
|
||||
limit_by = retry(max_attempts=10, exception_type=RequestError)(limit(timedelta(seconds=1))(by))
|
||||
prisecter = P(pri=9007199254740991, sec=9007199254740991, ter=9007199254740991) # * from ch.tetr.io
|
||||
results: list[BySuccessModel] = []
|
||||
while True:
|
||||
model = await limit_by('league', Parameter(after=prisecter.to_prisecter(), limit=100), x_session_id)
|
||||
prisecter = model.data.entries[-1].p
|
||||
results.append(model)
|
||||
if len(model.data.entries) < 100: # 分页值 # noqa: PLR2004
|
||||
break
|
||||
|
||||
players: list[Entry] = []
|
||||
for result in results:
|
||||
players.extend(result.data.entries)
|
||||
players.sort(key=lambda x: x.league.tr, reverse=True)
|
||||
|
||||
rank_player_mapping: defaultdict[Rank, list[Entry]] = defaultdict(list)
|
||||
for player in players:
|
||||
rank_player_mapping[player.league.rank].append(player)
|
||||
|
||||
stats = TETRIOLeagueStats(raw=[], fields=[], update_time=datetime.now(UTC))
|
||||
fields: list[TETRIOLeagueStatsField] = []
|
||||
for rank, percentile in RANK_PERCENTILE.items():
|
||||
offset = floor((percentile / 100) * len(players)) - 1
|
||||
tr_line = players[offset].league.tr
|
||||
rank_players = rank_player_mapping[rank]
|
||||
fields.append(
|
||||
TETRIOLeagueStatsField(
|
||||
rank=rank,
|
||||
tr_line=tr_line,
|
||||
player_count=len(rank_players),
|
||||
low_pps=find_special_player(rank_players, _pps, _min),
|
||||
low_apm=find_special_player(rank_players, _apm, _min),
|
||||
low_vs=find_special_player(rank_players, _vs, _min),
|
||||
avg_pps=mean(_pps(i) for i in rank_players),
|
||||
avg_apm=mean(_apm(i) for i in rank_players),
|
||||
avg_vs=mean(_vs(i) for i in rank_players),
|
||||
high_pps=find_special_player(rank_players, _pps, _max),
|
||||
high_apm=find_special_player(rank_players, _apm, _max),
|
||||
high_vs=find_special_player(rank_players, _vs, _max),
|
||||
stats=stats,
|
||||
)
|
||||
)
|
||||
historicals = [
|
||||
TETRIOLeagueHistorical(request_id=x_session_id, data=model, update_time=model.cache.cached_at, stats=stats)
|
||||
for model in results
|
||||
]
|
||||
stats.raw = historicals
|
||||
stats.fields = fields
|
||||
async with get_session() as session:
|
||||
session.add(stats)
|
||||
await session.commit()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _() -> None:
|
||||
async with get_session() as session:
|
||||
latest_time = await session.scalar(
|
||||
select(TETRIOLeagueStats.update_time).order_by(TETRIOLeagueStats.id.desc()).limit(1)
|
||||
)
|
||||
if latest_time is None or datetime.now(tz=UTC) - latest_time.replace(tzinfo=UTC) > timedelta(hours=6):
|
||||
await get_tetra_league_data()
|
||||
|
||||
|
||||
from . import all, detail # noqa: E402
|
||||
|
||||
base_command.add(command)
|
||||
|
||||
__all__ = ['all', 'detail']
|
||||
115
nonebot_plugin_tetris_stats/games/tetrio/rank/all.py
Normal file
115
nonebot_plugin_tetris_stats/games/tetrio/rank/all.py
Normal file
@@ -0,0 +1,115 @@
|
||||
from datetime import timedelta
|
||||
|
||||
from arclet.alconna import Arg
|
||||
from nonebot_plugin_alconna import Option, Subcommand, UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from ....db import trigger
|
||||
from ....utils.host import HostPage, get_self_netloc
|
||||
from ....utils.metrics import get_metrics
|
||||
from ....utils.render import render
|
||||
from ....utils.render.schemas.tetrio.rank.v1 import Data as DataV1
|
||||
from ....utils.render.schemas.tetrio.rank.v1 import ItemData as ItemDataV1
|
||||
from ....utils.render.schemas.tetrio.rank.v2 import AverageData as AverageDataV2
|
||||
from ....utils.render.schemas.tetrio.rank.v2 import Data as DataV2
|
||||
from ....utils.render.schemas.tetrio.rank.v2 import ItemData as ItemDataV2
|
||||
from ....utils.screenshot import screenshot
|
||||
from .. import alc
|
||||
from ..constant import GAME_TYPE
|
||||
from ..models import TETRIOLeagueStats
|
||||
from ..typing import Template
|
||||
from . import command
|
||||
|
||||
command.add(
|
||||
Subcommand(
|
||||
'--all', Option('--template', Arg('template', Template), alias=['-T'], help_text='要使用的查询模板'), dest='all'
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@alc.assign('TETRIO.rank.all')
|
||||
async def _(event_session: EventSession, template: Template | None = None):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='rank',
|
||||
command_args=['--all'] + ([f'--template {template}'] if template is not None else []),
|
||||
):
|
||||
async with get_session() as session:
|
||||
latest_data = (
|
||||
await session.scalars(
|
||||
select(TETRIOLeagueStats)
|
||||
.order_by(TETRIOLeagueStats.id.desc())
|
||||
.limit(1)
|
||||
.options(selectinload(TETRIOLeagueStats.fields))
|
||||
)
|
||||
).one()
|
||||
compare_data = (
|
||||
await session.scalars(
|
||||
select(TETRIOLeagueStats)
|
||||
.order_by(
|
||||
func.abs(
|
||||
func.julianday(TETRIOLeagueStats.update_time)
|
||||
- func.julianday(latest_data.update_time - timedelta(hours=24))
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
.options(selectinload(TETRIOLeagueStats.fields))
|
||||
)
|
||||
).one()
|
||||
match template:
|
||||
case 'v1' | None:
|
||||
await UniMessage.image(raw=await make_image_v1(latest_data, compare_data)).finish()
|
||||
case 'v2':
|
||||
await UniMessage.image(raw=await make_image_v2(latest_data, compare_data)).finish()
|
||||
|
||||
|
||||
async def make_image_v1(latest_data: TETRIOLeagueStats, compare_data: TETRIOLeagueStats) -> bytes:
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v1/tetrio/rank',
|
||||
DataV1(
|
||||
items={
|
||||
i[0].rank: ItemDataV1(
|
||||
trending=round(i[0].tr_line - i[1].tr_line, 2),
|
||||
require_tr=round(i[0].tr_line, 2),
|
||||
players=i[0].player_count,
|
||||
)
|
||||
for i in zip(latest_data.fields, compare_data.fields, strict=True)
|
||||
},
|
||||
updated_at=latest_data.update_time,
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||
|
||||
|
||||
async def make_image_v2(latest_data: TETRIOLeagueStats, compare_data: TETRIOLeagueStats) -> bytes:
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v2/tetrio/rank',
|
||||
DataV2(
|
||||
items={
|
||||
i[0].rank: ItemDataV2(
|
||||
require_tr=round(i[0].tr_line, 2),
|
||||
trending=round(i[0].tr_line - i[1].tr_line, 2),
|
||||
average_data=AverageDataV2(
|
||||
pps=(metrics := get_metrics(pps=i[0].avg_pps, apm=i[0].avg_apm, vs=i[0].avg_vs)).pps,
|
||||
apm=metrics.apm,
|
||||
apl=metrics.apl,
|
||||
vs=metrics.vs,
|
||||
adpl=metrics.adpl,
|
||||
),
|
||||
players=i[0].player_count,
|
||||
)
|
||||
for i in zip(latest_data.fields, compare_data.fields, strict=True)
|
||||
},
|
||||
updated_at=latest_data.update_time,
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||
128
nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py
Normal file
128
nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py
Normal file
@@ -0,0 +1,128 @@
|
||||
from datetime import timedelta, timezone
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from arclet.alconna import Arg
|
||||
from nonebot import get_driver
|
||||
from nonebot_plugin_alconna import Option, UniMessage
|
||||
from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.orm import selectinload
|
||||
|
||||
from ....db import trigger
|
||||
from ....utils.host import HostPage, get_self_netloc
|
||||
from ....utils.metrics import get_metrics
|
||||
from ....utils.render import render
|
||||
from ....utils.render.schemas.tetrio.rank.detail import Data, SpecialData
|
||||
from ....utils.screenshot import screenshot
|
||||
from .. import alc
|
||||
from ..api.typing import ValidRank
|
||||
from ..constant import GAME_TYPE
|
||||
from ..models import TETRIOLeagueStats
|
||||
from . import command
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
command.add(Option('--detail', Arg('rank', ValidRank), alias=['-D']))
|
||||
|
||||
|
||||
@alc.assign('TETRIO.rank')
|
||||
async def _(rank: ValidRank, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
command_type='rank',
|
||||
command_args=[f'--detail {rank}'],
|
||||
):
|
||||
async with get_session() as session:
|
||||
latest_data = (
|
||||
await session.scalars(
|
||||
select(TETRIOLeagueStats)
|
||||
.order_by(TETRIOLeagueStats.id.desc())
|
||||
.limit(1)
|
||||
.options(selectinload(TETRIOLeagueStats.fields))
|
||||
)
|
||||
).one()
|
||||
compare_data = (
|
||||
await session.scalars(
|
||||
select(TETRIOLeagueStats)
|
||||
.order_by(
|
||||
func.abs(
|
||||
func.julianday(TETRIOLeagueStats.update_time)
|
||||
- func.julianday(latest_data.update_time - timedelta(hours=24))
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
.options(selectinload(TETRIOLeagueStats.fields))
|
||||
)
|
||||
).one()
|
||||
await UniMessage.image(
|
||||
raw=await make_image(
|
||||
rank,
|
||||
latest_data,
|
||||
compare_data,
|
||||
)
|
||||
).finish()
|
||||
|
||||
|
||||
async def make_image(rank: ValidRank, latest: TETRIOLeagueStats, compare: TETRIOLeagueStats) -> bytes:
|
||||
latest_data = next(filter(lambda x: x.rank == rank, latest.fields))
|
||||
compare_data = next(filter(lambda x: x.rank == rank, compare.fields))
|
||||
avg = get_metrics(pps=latest_data.avg_pps, apm=latest_data.avg_apm, vs=latest_data.avg_vs)
|
||||
low_pps = get_metrics(
|
||||
pps=latest_data.low_pps.league.pps, apm=latest_data.low_pps.league.apm, vs=latest_data.low_pps.league.vs
|
||||
)
|
||||
low_apm = get_metrics(
|
||||
pps=latest_data.low_apm.league.pps, apm=latest_data.low_apm.league.apm, vs=latest_data.low_apm.league.vs
|
||||
)
|
||||
low_vs = get_metrics(
|
||||
pps=latest_data.low_vs.league.pps, apm=latest_data.low_vs.league.apm, vs=latest_data.low_vs.league.vs
|
||||
)
|
||||
max_pps = get_metrics(
|
||||
pps=latest_data.high_pps.league.pps, apm=latest_data.high_pps.league.apm, vs=latest_data.high_pps.league.vs
|
||||
)
|
||||
max_apm = get_metrics(
|
||||
pps=latest_data.high_apm.league.pps, apm=latest_data.high_apm.league.apm, vs=latest_data.high_apm.league.vs
|
||||
)
|
||||
max_vs = get_metrics(
|
||||
pps=latest_data.high_vs.league.pps, apm=latest_data.high_vs.league.apm, vs=latest_data.high_vs.league.vs
|
||||
)
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v2/tetrio/rank/detail',
|
||||
Data(
|
||||
name=latest_data.rank,
|
||||
trending=round(latest_data.tr_line - compare_data.tr_line, 2),
|
||||
require_tr=round(latest_data.tr_line, 2),
|
||||
players=latest_data.player_count,
|
||||
minimum_data=SpecialData(
|
||||
apm=low_apm.apm,
|
||||
pps=low_pps.pps,
|
||||
lpm=low_pps.lpm,
|
||||
vs=low_vs.vs,
|
||||
adpm=low_vs.adpm,
|
||||
apm_holder=latest_data.low_apm.username.upper(),
|
||||
pps_holder=latest_data.low_pps.username.upper(),
|
||||
vs_holder=latest_data.low_vs.username.upper(),
|
||||
),
|
||||
average_data=SpecialData(
|
||||
apm=avg.apm, pps=avg.pps, lpm=avg.lpm, vs=avg.vs, adpm=avg.adpm, apl=avg.apl, adpl=avg.adpl
|
||||
),
|
||||
maximum_data=SpecialData(
|
||||
apm=max_apm.apm,
|
||||
pps=max_pps.pps,
|
||||
lpm=max_pps.lpm,
|
||||
vs=max_vs.vs,
|
||||
adpm=max_vs.adpm,
|
||||
apm_holder=latest_data.high_apm.username.upper(),
|
||||
pps_holder=latest_data.high_pps.username.upper(),
|
||||
vs_holder=latest_data.high_vs.username.upper(),
|
||||
),
|
||||
updated_at=latest.update_time.replace(tzinfo=UTC).astimezone(ZoneInfo('Asia/Shanghai')),
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||
@@ -1,7 +1,6 @@
|
||||
from asyncio import gather
|
||||
from datetime import timedelta
|
||||
from hashlib import md5
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.matcher import Matcher
|
||||
@@ -11,6 +10,7 @@ from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import get_user
|
||||
from yarl import URL
|
||||
|
||||
from ....db import query_bind_info, trigger
|
||||
from ....utils.exception import RecordNotFoundError
|
||||
@@ -94,7 +94,9 @@ async def make_blitz_image(player: Player) -> bytes:
|
||||
user=User(
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}') % {'revision': avatar_revision}
|
||||
)
|
||||
if (avatar_revision := (await player.avatar_revision)) is not None and avatar_revision != 0
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from asyncio import gather
|
||||
from datetime import timedelta
|
||||
from hashlib import md5
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.matcher import Matcher
|
||||
@@ -11,6 +10,7 @@ from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import get_user
|
||||
from yarl import URL
|
||||
|
||||
from ....db import query_bind_info, trigger
|
||||
from ....utils.exception import RecordNotFoundError
|
||||
@@ -95,7 +95,9 @@ async def make_sprint_image(player: Player) -> bytes:
|
||||
user=User(
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}') % {'revision': avatar_revision}
|
||||
)
|
||||
if (avatar_revision := (await player.avatar_revision)) is not None and avatar_revision != 0
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
from contextlib import suppress
|
||||
from datetime import datetime, timezone
|
||||
from io import StringIO
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from lxml import etree
|
||||
from pandas import read_html
|
||||
|
||||
from ....config.config import config
|
||||
from ....db import anti_duplicate_add
|
||||
from ....utils.request import Request, splice_url
|
||||
from ....utils.request import Request
|
||||
from ..constant import BASE_URL, USER_NAME
|
||||
from .models import TOPHistoricalData
|
||||
from .schemas.user import User
|
||||
@@ -15,6 +15,8 @@ from .schemas.user_profile import Data, UserProfile
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
request = Request(config.tetris.proxy.top or config.tetris.proxy.main)
|
||||
|
||||
|
||||
class Player:
|
||||
def __init__(self, *, user_name: str, trust: bool = False) -> None:
|
||||
@@ -35,8 +37,7 @@ class Player:
|
||||
async def get_profile(self) -> UserProfile:
|
||||
"""获取用户信息"""
|
||||
if self._user_profile is None:
|
||||
url = splice_url([BASE_URL, 'profile.php', f'?{urlencode({"user":self.user_name})}'])
|
||||
raw_user_profile = await Request.request(url, is_json=False)
|
||||
raw_user_profile = await request.request(BASE_URL / 'profile.php' % {'user': self.user_name}, is_json=False)
|
||||
self._user_profile = self._parse_profile(raw_user_profile)
|
||||
await anti_duplicate_add(
|
||||
TOPHistoricalData(
|
||||
@@ -68,4 +69,4 @@ class Player:
|
||||
total: list[Data] = []
|
||||
for _, value in dataframe.iterrows():
|
||||
total.append(Data(lpm=value['lpm'], apm=value['apm']))
|
||||
return UserProfile(user_name=user_name, today=today, total=total)
|
||||
return UserProfile(user_name=user_name, today=today, total=total or None)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
from re import compile
|
||||
from typing import Literal
|
||||
|
||||
from yarl import URL
|
||||
|
||||
GAME_TYPE: Literal['TOP'] = 'TOP'
|
||||
|
||||
BASE_URL = 'http://tetrisonline.pl/top/'
|
||||
BASE_URL = URL('http://tetrisonline.pl/top/')
|
||||
|
||||
USER_NAME = compile(r'^[a-zA-Z0-9_]{1,16}$')
|
||||
|
||||
@@ -78,7 +78,7 @@ async def make_query_image(profile: UserProfile) -> bytes:
|
||||
await render(
|
||||
'v1/top/info',
|
||||
Info(
|
||||
user=People(avatar=get_avatar(), name=profile.user_name),
|
||||
user=People(avatar=get_avatar(profile.user_name), name=profile.user_name),
|
||||
today=InfoData(pps=today.pps, lpm=today.lpm, apm=today.apm, apl=today.apl),
|
||||
history=InfoData(pps=history.pps, lpm=history.lpm, apm=history.apm, apl=history.apl),
|
||||
),
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
from datetime import datetime, timezone
|
||||
from typing import overload
|
||||
from urllib.parse import urlencode
|
||||
from typing import cast, overload
|
||||
|
||||
from httpx import TimeoutException
|
||||
from nonebot.compat import type_validate_json
|
||||
from yarl import URL
|
||||
|
||||
from ....config.config import config
|
||||
from ....db import anti_duplicate_add
|
||||
from ....utils.exception import RequestError
|
||||
from ....utils.request import Request, splice_url
|
||||
from ....utils.request import Request
|
||||
from ..constant import BASE_URL, USER_NAME
|
||||
from .models import TOSHistoricalData
|
||||
from .schemas.user import User
|
||||
@@ -16,6 +17,8 @@ from .schemas.user_profile import UserProfile
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
request = Request(config.tetris.proxy.tos or config.tetris.proxy.main)
|
||||
|
||||
|
||||
class Player:
|
||||
@overload
|
||||
@@ -56,29 +59,14 @@ class Player:
|
||||
async def get_info(self) -> UserInfoSuccess:
|
||||
"""获取用户信息"""
|
||||
if self._user_info is None:
|
||||
if self.teaid is not None:
|
||||
url = [
|
||||
splice_url(
|
||||
[
|
||||
i,
|
||||
'getTeaIdInfo',
|
||||
f'?{urlencode({"teaId":self.teaid})}',
|
||||
]
|
||||
)
|
||||
for i in BASE_URL
|
||||
]
|
||||
else:
|
||||
url = [
|
||||
splice_url(
|
||||
[
|
||||
i,
|
||||
'getUsernameInfo',
|
||||
f'?{urlencode({"username":self.user_name})}',
|
||||
]
|
||||
)
|
||||
for i in BASE_URL
|
||||
]
|
||||
raw_user_info = await Request.failover_request(url, failover_code=[502], failover_exc=(TimeoutException,))
|
||||
path = str(
|
||||
URL('getTeaIdInfo') % {'teaId': self.teaid}
|
||||
if self.teaid is not None
|
||||
else URL('getUsernameInfo') % {'username': cast(str, self.user_name)}
|
||||
)
|
||||
raw_user_info = await request.failover_request(
|
||||
[i / path for i in BASE_URL], failover_code=[502], failover_exc=(TimeoutException,)
|
||||
)
|
||||
user_info: UserInfo = type_validate_json(UserInfo, raw_user_info) # type: ignore[arg-type]
|
||||
if not isinstance(user_info, UserInfoSuccess):
|
||||
msg = f'用户信息请求错误:\n{user_info.error}'
|
||||
@@ -98,17 +86,11 @@ class Player:
|
||||
"""获取用户数据"""
|
||||
if other_parameter is None:
|
||||
other_parameter = {}
|
||||
params = urlencode(dict(sorted(other_parameter.items())))
|
||||
params = (URL('') % dict(sorted(other_parameter.items()))).human_repr()
|
||||
if self._user_profile.get(params) is None:
|
||||
raw_user_profile = await Request.failover_request(
|
||||
raw_user_profile = await request.failover_request(
|
||||
[
|
||||
splice_url(
|
||||
[
|
||||
i,
|
||||
'getProfile',
|
||||
f'?{urlencode({"id":self.teaid or self.user_name,**other_parameter})}',
|
||||
]
|
||||
)
|
||||
i / 'getProfile' % {'id': self.teaid or cast(str, self.user_name), **other_parameter}
|
||||
for i in BASE_URL
|
||||
],
|
||||
failover_code=[502],
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
from re import compile
|
||||
from typing import Literal
|
||||
|
||||
from yarl import URL
|
||||
|
||||
GAME_TYPE: Literal['TOS'] = 'TOS'
|
||||
|
||||
BASE_URL = {
|
||||
'https://teatube.cn:8888/',
|
||||
'http://cafuuchino1.studio26f.org:19970',
|
||||
URL('https://teatube.cn:8888/'),
|
||||
URL('http://cafuuchino1.studio26f.org:19970'),
|
||||
}
|
||||
|
||||
USER_NAME = compile(
|
||||
|
||||
@@ -223,7 +223,7 @@ async def make_query_image(user_info: UserInfoSuccess, game_data: GameData, even
|
||||
user=People(
|
||||
avatar=await get_avatar(event_user_info, 'Data URI', None)
|
||||
if event_user_info is not None
|
||||
else get_random_avatar(),
|
||||
else get_random_avatar(user_info.data.teaid),
|
||||
name=user_info.data.name,
|
||||
),
|
||||
ranking=Ranking(rating=float(user_info.data.ranking), rd=round(float(user_info.data.rd_now), 2)),
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
import sys
|
||||
from collections.abc import Callable, Coroutine
|
||||
from os import environ
|
||||
from platform import system
|
||||
from re import sub
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
from playwright.__main__ import main
|
||||
from playwright.async_api import Browser, async_playwright
|
||||
from playwright.async_api import Browser, BrowserContext, async_playwright
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
@@ -27,6 +29,7 @@ class BrowserManager:
|
||||
"""浏览器管理类"""
|
||||
|
||||
_browser: Browser | None = None
|
||||
_contexts: ClassVar[dict[str, BrowserContext]] = {}
|
||||
|
||||
@classmethod
|
||||
async def init_playwright(cls) -> None:
|
||||
@@ -72,7 +75,11 @@ class BrowserManager:
|
||||
async def _start_browser(cls) -> Browser:
|
||||
"""启动浏览器实例"""
|
||||
playwright = await async_playwright().start()
|
||||
cls._browser = await playwright.firefox.launch()
|
||||
cls._browser = await playwright.firefox.launch(
|
||||
firefox_user_prefs={
|
||||
'network.http.max-persistent-connections-per-server': 64,
|
||||
},
|
||||
)
|
||||
return cls._browser
|
||||
|
||||
@classmethod
|
||||
@@ -80,8 +87,26 @@ class BrowserManager:
|
||||
"""获取浏览器实例"""
|
||||
return cls._browser or await cls._start_browser()
|
||||
|
||||
@classmethod
|
||||
async def get_context(
|
||||
cls, context_id: str = 'default', factory: Callable[[], Coroutine[Any, Any, BrowserContext]] | None = None
|
||||
) -> BrowserContext:
|
||||
"""获取浏览器上下文"""
|
||||
return cls._contexts.setdefault(
|
||||
context_id, await factory() if factory is not None else await (await cls.get_browser()).new_context()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def del_context(cls, context_id: str) -> None:
|
||||
"""删除浏览器上下文"""
|
||||
if context_id in cls._contexts:
|
||||
await cls._contexts[context_id].close()
|
||||
del cls._contexts[context_id]
|
||||
|
||||
@classmethod
|
||||
async def close_browser(cls) -> None:
|
||||
"""关闭浏览器实例"""
|
||||
for i in cls._contexts.values():
|
||||
await i.close()
|
||||
if isinstance(cls._browser, Browser):
|
||||
await cls._browser.close()
|
||||
|
||||
@@ -1,16 +1,20 @@
|
||||
from functools import cache
|
||||
from hashlib import sha256
|
||||
from ipaddress import IPv4Address, IPv6Address
|
||||
from pathlib import Path as FilePath
|
||||
from typing import TYPE_CHECKING, ClassVar, Literal
|
||||
|
||||
from fastapi import FastAPI, Path, status
|
||||
from aiofiles import open
|
||||
from fastapi import BackgroundTasks, FastAPI, Path, status
|
||||
from fastapi.responses import FileResponse, HTMLResponse, Response
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from nonebot import get_app, get_driver
|
||||
from nonebot.log import logger
|
||||
from yarl import URL
|
||||
|
||||
from ..config.config import CACHE_PATH
|
||||
from ..games.tetrio.api.cache import request
|
||||
from .image import img_to_png
|
||||
from .request import Request
|
||||
from .templates import TEMPLATES_DIR
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -22,6 +26,7 @@ driver = get_driver()
|
||||
|
||||
global_config = driver.config
|
||||
|
||||
BASE_URL = URL('https://tetr.io/user-content/')
|
||||
|
||||
if not isinstance(app, FastAPI):
|
||||
msg = '本插件需要 FastAPI 驱动器才能运行'
|
||||
@@ -63,20 +68,30 @@ def _(page_hash: str) -> HTMLResponse:
|
||||
|
||||
@app.get('/host/resource/tetrio/{resource_type}/{user_id}', status_code=status.HTTP_200_OK)
|
||||
async def _(
|
||||
resource_type: Literal['avatars', 'banners'], revision: int, user_id: str = Path(regex=r'^[a-f0-9]{24}$')
|
||||
resource_type: Literal['avatars', 'banners'],
|
||||
revision: int,
|
||||
background_tasks: BackgroundTasks,
|
||||
user_id: str = Path(regex=r'^[a-f0-9]{24}$'),
|
||||
) -> Response:
|
||||
if not (path := CACHE_PATH / 'tetrio' / resource_type / f'{user_id}_{revision}.png').exists():
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(
|
||||
img_to_png(
|
||||
await Request.request(
|
||||
f'https://tetr.io/user-content/{resource_type}/{user_id}.jpg?rv={revision}', is_json=False
|
||||
)
|
||||
image = img_to_png(
|
||||
await request.request(
|
||||
BASE_URL / resource_type / f'{user_id}.jpg' % {'rv': revision},
|
||||
is_json=False,
|
||||
)
|
||||
)
|
||||
background_tasks.add_task(write_cache, path=path, data=image)
|
||||
return Response(content=image, media_type='image/png')
|
||||
return FileResponse(path)
|
||||
|
||||
|
||||
async def write_cache(path: FilePath, data: bytes) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
async with open(path, 'wb') as file:
|
||||
await file.write(data)
|
||||
|
||||
|
||||
@cache
|
||||
def get_self_netloc() -> str:
|
||||
host: IPv4Address | IPv6Address | IPvAnyAddress = global_config.host
|
||||
if isinstance(host, IPv4Address):
|
||||
|
||||
33
nonebot_plugin_tetris_stats/utils/limit.py
Normal file
33
nonebot_plugin_tetris_stats/utils/limit.py
Normal file
@@ -0,0 +1,33 @@
|
||||
from asyncio import Lock, sleep
|
||||
from collections.abc import Callable, Coroutine
|
||||
from datetime import timedelta
|
||||
from functools import wraps
|
||||
from time import time
|
||||
from typing import Any, ParamSpec, TypeVar
|
||||
|
||||
from nonebot.log import logger
|
||||
|
||||
P = ParamSpec('P')
|
||||
T = TypeVar('T')
|
||||
|
||||
|
||||
def limit(limit: timedelta) -> Callable[[Callable[P, Coroutine[Any, Any, T]]], Callable[P, Coroutine[Any, Any, T]]]:
|
||||
limit_seconds = limit.total_seconds()
|
||||
|
||||
def decorator(func: Callable[P, Coroutine[Any, Any, T]]) -> Callable[P, Coroutine[Any, Any, T]]:
|
||||
last_call = 0.0
|
||||
lock = Lock()
|
||||
|
||||
@wraps(func)
|
||||
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
|
||||
nonlocal last_call
|
||||
async with lock:
|
||||
if (diff := (time() - last_call)) < limit_seconds:
|
||||
logger.debug(f'request limit {(limit_time:=limit_seconds-diff)}s')
|
||||
await sleep(limit_time)
|
||||
last_call = time()
|
||||
return await func(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
@@ -1,6 +1,6 @@
|
||||
from base64 import b64encode
|
||||
from io import BytesIO
|
||||
from random import choice, randint
|
||||
from random import Random
|
||||
|
||||
from PIL import Image
|
||||
from PIL.Image import Resampling
|
||||
@@ -8,12 +8,13 @@ from PIL.Image import Resampling
|
||||
from .draw import PIECE_MEMBERS, SkinManager
|
||||
|
||||
|
||||
def get_avatar() -> str:
|
||||
def get_avatar(send: float | str | bytes | bytearray | None = None) -> str:
|
||||
random = Random(send) # noqa: S311
|
||||
skin = (
|
||||
SkinManager.get_skin()
|
||||
.get_piece(choice(PIECE_MEMBERS)) # noqa: S311
|
||||
SkinManager.get_skin(send)
|
||||
.get_piece(random.choice(PIECE_MEMBERS))
|
||||
.rotate(
|
||||
randint(-360, 360), # noqa: S311
|
||||
random.randint(-360, 360),
|
||||
expand=True,
|
||||
resample=Resampling.BICUBIC,
|
||||
)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from random import choice
|
||||
from random import Random
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from PIL.Image import Image
|
||||
@@ -151,8 +151,8 @@ class SkinManager:
|
||||
cls.skin.append(skin)
|
||||
|
||||
@classmethod
|
||||
def get_skin(cls) -> 'Skin':
|
||||
return choice(cls.skin) # noqa: S311
|
||||
def get_skin(cls, send: float | str | bytes | bytearray | None = None) -> 'Skin':
|
||||
return Random(send).choice(cls.skin) # noqa: S311
|
||||
|
||||
|
||||
class Skin(ABC):
|
||||
|
||||
@@ -90,5 +90,5 @@ class TechSkin(Skin):
|
||||
@driver.on_startup
|
||||
def _():
|
||||
path = Path(__file__).parent / 'skins'
|
||||
for i in path.iterdir():
|
||||
for i in sorted(path.iterdir()):
|
||||
TechSkin(i)
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ......games.tetrio.api.typing import Rank, ValidRank
|
||||
from ......games.tetrio.api.typing import Rank
|
||||
from .....typing import Number
|
||||
from ...base import Avatar
|
||||
from .base import TetraLeagueHistoryData
|
||||
@@ -21,7 +21,7 @@ class User(BaseModel):
|
||||
name: str
|
||||
country: str | None
|
||||
|
||||
role: Literal['anon', 'user', 'bot', 'halfmod', 'mod', 'admin', 'sysop', 'banned']
|
||||
role: Literal['anon', 'user', 'bot', 'halfmod', 'mod', 'admin', 'sysop', 'hidden', 'banned']
|
||||
|
||||
avatar: str | Avatar
|
||||
banner: str | None
|
||||
@@ -31,7 +31,6 @@ class User(BaseModel):
|
||||
friend_count: int | None
|
||||
supporter_tier: int
|
||||
|
||||
verified: bool
|
||||
bad_standing: bool
|
||||
|
||||
badges: list[Badge]
|
||||
@@ -53,7 +52,7 @@ class TetraLeagueStatistic(BaseModel):
|
||||
|
||||
class TetraLeague(BaseModel):
|
||||
rank: Rank
|
||||
highest_rank: ValidRank
|
||||
highest_rank: Rank
|
||||
|
||||
tr: Number
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ class User(BaseModel):
|
||||
name: str
|
||||
avatar: str | Avatar
|
||||
country: str | None
|
||||
verified: bool
|
||||
tetra_league: TetraLeague
|
||||
xp: Number
|
||||
join_at: datetime | None
|
||||
|
||||
@@ -1,54 +1,79 @@
|
||||
from collections.abc import Sequence
|
||||
from http import HTTPStatus
|
||||
from urllib.parse import urljoin, urlparse
|
||||
from typing import Any
|
||||
|
||||
from aiofiles import open
|
||||
from httpx import AsyncClient, HTTPError
|
||||
from nonebot import get_driver, get_plugin_config
|
||||
from msgspec import DecodeError, Struct, json
|
||||
from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
from playwright.async_api import Response
|
||||
from ujson import JSONDecodeError, dumps, loads
|
||||
from yarl import URL
|
||||
|
||||
from ..config.config import CACHE_PATH, Config
|
||||
from ..config.config import CACHE_PATH, config
|
||||
from .browser import BrowserManager
|
||||
from .exception import RequestError
|
||||
|
||||
driver = get_driver()
|
||||
config = get_plugin_config(Config)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await Request.init_cache()
|
||||
await Request.read_cache()
|
||||
class CloudflareCache(Struct):
|
||||
headers: dict[str, Any] | None = None
|
||||
cookies: dict[str, Any] | None = None
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await Request.write_cache()
|
||||
encoder = json.Encoder()
|
||||
decoder = json.Decoder()
|
||||
|
||||
|
||||
def splice_url(url_list: list[str]) -> str:
|
||||
url = ''
|
||||
if len(url_list):
|
||||
url = url_list.pop(0)
|
||||
for i in url_list:
|
||||
url = urljoin(url, i)
|
||||
return url
|
||||
class AntiCloudflare:
|
||||
cache_decoder = json.Decoder(type=CloudflareCache)
|
||||
|
||||
def __init__(self, domain_suffix: str) -> None:
|
||||
self.domain_suffix = domain_suffix
|
||||
self.cache_path = CACHE_PATH / f'{self.domain_suffix}_cloudflare_cache.json'
|
||||
self._headers: dict | None = None
|
||||
self._cookies: dict | None = None
|
||||
self.read_cache()
|
||||
|
||||
class Request:
|
||||
"""网络请求相关类"""
|
||||
def read_cache(self) -> None:
|
||||
"""读取缓存文件"""
|
||||
try:
|
||||
cache: CloudflareCache = self.cache_decoder.decode(self.cache_path.read_text(encoding='UTF-8'))
|
||||
self._headers = cache.headers
|
||||
self._cookies = cache.cookies
|
||||
except (OSError, DecodeError):
|
||||
self.cache_path.unlink()
|
||||
self.write_cache()
|
||||
|
||||
_CACHE_FILE = CACHE_PATH / 'cloudflare_cache.json'
|
||||
_headers: dict | None = None
|
||||
_cookies: dict | None = None
|
||||
def write_cache(self) -> None:
|
||||
"""写入缓存文件"""
|
||||
self.cache_path.write_bytes(json.encode(CloudflareCache(headers=self.headers, cookies=self.cookies)))
|
||||
|
||||
@classmethod
|
||||
async def _anti_cloudflare(cls, url: str) -> bytes:
|
||||
@property
|
||||
def headers(self) -> dict | None:
|
||||
return self._headers
|
||||
|
||||
@headers.setter
|
||||
def headers(self, value: dict | None) -> None:
|
||||
self._headers = value
|
||||
self.write_cache()
|
||||
|
||||
@property
|
||||
def cookies(self) -> dict | None:
|
||||
return self._cookies
|
||||
|
||||
@cookies.setter
|
||||
def cookies(self, value: dict | None) -> None:
|
||||
self._cookies = value
|
||||
self.write_cache()
|
||||
|
||||
async def __call__(self, url: str, proxy: str | None = None) -> bytes:
|
||||
"""用firefox硬穿五秒盾"""
|
||||
browser = await BrowserManager.get_browser()
|
||||
async with await browser.new_context() as context, await context.new_page() as page:
|
||||
async with (
|
||||
await browser.new_context(proxy={'server': proxy} if proxy is not None else None) as context,
|
||||
await context.new_page() as page,
|
||||
):
|
||||
response = await page.goto(url)
|
||||
attempts = 0
|
||||
while attempts < 60: # noqa: PLR2004
|
||||
@@ -61,84 +86,70 @@ class Request:
|
||||
logger.warning('疑似触发了 Cloudflare 的验证码')
|
||||
break
|
||||
try:
|
||||
loads(text)
|
||||
except JSONDecodeError:
|
||||
decoder.decode(text)
|
||||
except DecodeError:
|
||||
await page.wait_for_timeout(1000)
|
||||
else:
|
||||
if not isinstance(response, Response):
|
||||
msg = 'api请求失败'
|
||||
raise RequestError(msg)
|
||||
cls._headers = await response.request.all_headers()
|
||||
self.headers = await response.request.all_headers()
|
||||
try:
|
||||
cls._cookies = {
|
||||
self.cookies = {
|
||||
name: value
|
||||
for i in await context.cookies()
|
||||
if (name := i.get('name')) is not None and (value := i.get('value')) is not None
|
||||
}
|
||||
except KeyError:
|
||||
cls._cookies = None
|
||||
self.cookies = None
|
||||
return await response.body()
|
||||
msg = '绕过五秒盾失败'
|
||||
raise RequestError(msg)
|
||||
|
||||
@classmethod
|
||||
async def init_cache(cls) -> None:
|
||||
"""初始化缓存文件"""
|
||||
if not cls._CACHE_FILE.exists():
|
||||
async with open(file=cls._CACHE_FILE, mode='w', encoding='UTF-8') as file:
|
||||
await file.write(dumps({'headers': cls._headers, 'cookies': cls._cookies}))
|
||||
|
||||
@classmethod
|
||||
async def read_cache(cls) -> None:
|
||||
"""读取缓存文件"""
|
||||
try:
|
||||
async with open(file=cls._CACHE_FILE, mode='r', encoding='UTF-8') as file:
|
||||
json = loads(await file.read())
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except (PermissionError, JSONDecodeError):
|
||||
cls._CACHE_FILE.unlink()
|
||||
await cls.init_cache()
|
||||
else:
|
||||
cls._headers = json['headers']
|
||||
cls._cookies = json['cookies']
|
||||
class Request:
|
||||
"""网络请求相关类"""
|
||||
|
||||
@classmethod
|
||||
async def write_cache(cls) -> None:
|
||||
"""写入缓存文件"""
|
||||
try:
|
||||
async with open(file=cls._CACHE_FILE, mode='r+', encoding='UTF-8') as file:
|
||||
await file.write(dumps({'headers': cls._headers, 'cookies': cls._cookies}))
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except (PermissionError, JSONDecodeError):
|
||||
cls._CACHE_FILE.unlink()
|
||||
await cls.init_cache()
|
||||
def __init__(self, proxy: str | None) -> None:
|
||||
self.proxy = proxy
|
||||
self.anti_cloudflares: dict[str, AntiCloudflare] = {}
|
||||
|
||||
@classmethod
|
||||
async def request(cls, url: str, *, is_json: bool = True) -> bytes:
|
||||
async def request(
|
||||
self,
|
||||
url: URL,
|
||||
extra_headers: dict | None = None,
|
||||
*,
|
||||
is_json: bool = True,
|
||||
enable_anti_cloudflare: bool = False,
|
||||
) -> bytes:
|
||||
"""请求api"""
|
||||
if (anti_cloudflare := self.anti_cloudflares.get(url.host or '')) is not None:
|
||||
cookies = anti_cloudflare.cookies
|
||||
headers = anti_cloudflare.headers
|
||||
else:
|
||||
cookies = None
|
||||
headers = None
|
||||
headers = headers if extra_headers is None else extra_headers if headers is None else headers | extra_headers
|
||||
try:
|
||||
async with AsyncClient(cookies=cls._cookies, timeout=config.tetris_req_timeout) as session:
|
||||
response = await session.get(url, headers=cls._headers)
|
||||
async with AsyncClient(cookies=cookies, timeout=config.tetris.request_timeout, proxy=self.proxy) as session:
|
||||
response = await session.get(str(url), headers=headers)
|
||||
if response.status_code != HTTPStatus.OK:
|
||||
msg = f'请求错误 code: {response.status_code} {HTTPStatus(response.status_code).phrase}\n{response.text}'
|
||||
raise RequestError(msg, status_code=response.status_code)
|
||||
if is_json:
|
||||
loads(response.content)
|
||||
decoder.decode(response.content)
|
||||
return response.content
|
||||
except HTTPError as e:
|
||||
msg = f'请求错误 \n{e!r}'
|
||||
raise RequestError(msg) from e
|
||||
except JSONDecodeError:
|
||||
if urlparse(url).netloc.lower().endswith('tetr.io'):
|
||||
return await cls._anti_cloudflare(url)
|
||||
except DecodeError: # 由于捕获的是 DecodeError 所以一定是 is_json = True
|
||||
if enable_anti_cloudflare and url.host is not None:
|
||||
return await self.anti_cloudflares.setdefault(url.host, AntiCloudflare(url.host))(str(url), self.proxy)
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
async def failover_request(
|
||||
cls,
|
||||
urls: Sequence[str],
|
||||
self,
|
||||
urls: Sequence[URL],
|
||||
*,
|
||||
failover_code: Sequence[int],
|
||||
failover_exc: tuple[type[BaseException], ...],
|
||||
@@ -148,7 +159,7 @@ class Request:
|
||||
for i in urls:
|
||||
logger.debug(f'尝试请求 {i}')
|
||||
try:
|
||||
return await cls.request(i, is_json=is_json)
|
||||
return await self.request(i, is_json=is_json)
|
||||
except RequestError as e:
|
||||
if e.status_code in failover_code: # 如果状态码在 failover_code 中, 则继续尝试下一个URL
|
||||
error_list.append(e)
|
||||
|
||||
@@ -1,23 +1,21 @@
|
||||
from nonebot import get_plugin_config
|
||||
from playwright.async_api import TimeoutError, ViewportSize
|
||||
from playwright.async_api import BrowserContext, TimeoutError, ViewportSize
|
||||
|
||||
from ..config.config import Config
|
||||
from ..config.config import config
|
||||
from .browser import BrowserManager
|
||||
from .retry import retry
|
||||
from .time_it import time_it
|
||||
|
||||
config = get_plugin_config(Config)
|
||||
|
||||
async def context_factory() -> BrowserContext:
|
||||
return await (await BrowserManager.get_browser()).new_context(device_scale_factor=config.tetris.screenshot_quality)
|
||||
|
||||
|
||||
@retry(exception_type=TimeoutError, reply='截图失败, 重试中')
|
||||
@time_it
|
||||
async def screenshot(url: str) -> bytes:
|
||||
browser = await BrowserManager.get_browser()
|
||||
async with (
|
||||
await browser.new_page(device_scale_factor=config.tetris_screenshot_quality) as page,
|
||||
):
|
||||
context = await BrowserManager.get_context('screenshot', factory=context_factory)
|
||||
async with await context.new_page() as page:
|
||||
await page.goto(url)
|
||||
await page.wait_for_load_state('networkidle')
|
||||
size: ViewportSize = await page.evaluate("""
|
||||
() => {
|
||||
const element = document.querySelector('#content');
|
||||
@@ -28,4 +26,5 @@ async def screenshot(url: str) -> bytes:
|
||||
};
|
||||
""")
|
||||
await page.set_viewport_size(size)
|
||||
return await page.locator('id=content').screenshot(timeout=5000, type='png')
|
||||
await page.wait_for_load_state('networkidle')
|
||||
return await page.locator('id=content').screenshot(animations='disabled', timeout=5000, type='png')
|
||||
|
||||
@@ -11,19 +11,20 @@ from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
from nonebot.permission import SUPERUSER
|
||||
from nonebot_plugin_alconna import Alconna, Args, Option, on_alconna
|
||||
from nonebot_plugin_localstore import get_cache_file, get_data_dir
|
||||
from rich.progress import Progress
|
||||
|
||||
from ..config.config import CACHE_PATH, DATA_PATH, config
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
TEMPLATES_DIR = get_data_dir('nonebot_plugin_tetris_stats') / 'templates'
|
||||
TEMPLATES_DIR = DATA_PATH / 'templates'
|
||||
|
||||
alc = on_alconna(Alconna('更新模板', Option('--revision', Args['revision', str], alias={'-R'})), permission=SUPERUSER)
|
||||
|
||||
|
||||
async def download_templates(tag: str) -> Path:
|
||||
logger.info(f'开始下载模板 {tag}')
|
||||
async with AsyncClient() as client:
|
||||
async with AsyncClient(proxy=config.tetris.proxy.github or config.tetris.proxy.main) as client:
|
||||
if tag == 'latest':
|
||||
logger.info('目标为 latest, 正在获取最新版本号')
|
||||
tag = (
|
||||
@@ -36,7 +37,7 @@ async def download_templates(tag: str) -> Path:
|
||||
.rsplit('/', 1)[-1]
|
||||
)
|
||||
logger.success(f'获取到的最新版本号: {tag}')
|
||||
path = get_cache_file('nonebot_plugin_tetris_stats', f'dist_{time_ns()}.zip')
|
||||
path = CACHE_PATH / f'dist_{time_ns()}.zip'
|
||||
with Progress() as progress:
|
||||
task_id = progress.add_task('[red]Downloading...', total=None)
|
||||
async with (
|
||||
@@ -104,7 +105,7 @@ async def init_templates(tag: str) -> bool:
|
||||
|
||||
|
||||
async def check_tag(tag: str) -> bool:
|
||||
async with AsyncClient() as client:
|
||||
async with AsyncClient(proxy=config.tetris.proxy.github or config.tetris.proxy.main) as client:
|
||||
return (
|
||||
await client.get(f'https://github.com/A-Minos/tetris-stats-templates/releases/tag/{tag}')
|
||||
).status_code != HTTPStatus.NOT_FOUND
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
from typing import Literal
|
||||
from typing import Literal, TypeAlias
|
||||
|
||||
Number = float | int
|
||||
GameType = Literal['IO', 'TOP', 'TOS']
|
||||
BaseCommandType = Literal['bind', 'query']
|
||||
TETRIOCommandType = BaseCommandType | Literal['rank', 'config', 'list', 'record']
|
||||
AllCommandType = BaseCommandType | TETRIOCommandType
|
||||
Me = Literal[
|
||||
Number: TypeAlias = float | int
|
||||
GameType: TypeAlias = Literal['IO', 'TOP', 'TOS']
|
||||
BaseCommandType: TypeAlias = Literal['bind', 'query']
|
||||
TETRIOCommandType: TypeAlias = BaseCommandType | Literal['rank', 'config', 'list', 'record']
|
||||
AllCommandType: TypeAlias = BaseCommandType | TETRIOCommandType
|
||||
Me: TypeAlias = Literal[
|
||||
'我',
|
||||
'自己',
|
||||
'我等',
|
||||
|
||||
2231
poetry.lock
generated
2231
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = 'nonebot-plugin-tetris-stats'
|
||||
version = '1.4.7'
|
||||
version = '1.5.3'
|
||||
description = '一款基于 NoneBot2 的用于查询 Tetris 相关游戏数据的插件'
|
||||
authors = ['scdhh <wallfjjd@gmail.com>']
|
||||
readme = 'README.md'
|
||||
@@ -21,25 +21,28 @@ nonebot-plugin-user = '>=0.2,<0.5'
|
||||
nonebot-plugin-userinfo = '^0.2.4'
|
||||
aiocache = '^0.12.2'
|
||||
aiofiles = '>=23.2.1,<25.0.0'
|
||||
arclet-alconna = '<2.0.0'
|
||||
async-lru = '^2.0.4'
|
||||
httpx = '^0.27.0'
|
||||
jinja2 = '^3.1.3'
|
||||
lxml = '^5.1.0'
|
||||
msgspec = '^0.18.6'
|
||||
pandas = '>=1.4.3,<3.0.0'
|
||||
pillow = '^10.3.0'
|
||||
playwright = '^1.41.2'
|
||||
rich = '^13.7.1'
|
||||
ujson = '^5.9.0'
|
||||
yarl = '^1.9.4'
|
||||
zstandard = '>=0.22,<0.24'
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
basedpyright = '^1.17.0'
|
||||
mypy = '>=1.9'
|
||||
pandas-stubs = '>=1.5.2,<3.0.0'
|
||||
ruff = '>=0.3.0'
|
||||
types-aiofiles = '>=23.2.0.20240106,<25.0.0.0'
|
||||
types-lxml = '^2024.2.9'
|
||||
types-pillow = '^10.2.0.20240423'
|
||||
types-ujson = '^5.9.0'
|
||||
pandas-stubs = '>=1.5.2,<3.0.0'
|
||||
nonebot2 = { extras = ['all'], version = '^2.3.0' }
|
||||
nonebot-adapter-discord = '^0.1.3'
|
||||
nonebot-adapter-kaiheila = '^0.3.4'
|
||||
@@ -49,8 +52,10 @@ nonebot-adapter-satori = '>=0.11.4,<0.13.0'
|
||||
nonebot-plugin-orm = { extras = ['default'], version = '>=0.3,<0.8' }
|
||||
|
||||
[tool.poetry.group.debug.dependencies]
|
||||
matplotlib = '^3.9.2'
|
||||
memory-profiler = '^0.61.0'
|
||||
objprint = '^0.2.2'
|
||||
pyqt6 = '^6.7.1'
|
||||
viztracer = '^0.16.2'
|
||||
|
||||
[build-system]
|
||||
@@ -131,6 +136,11 @@ builtins-ignorelist = ['id']
|
||||
[tool.ruff.format]
|
||||
quote-style = 'single'
|
||||
|
||||
[tool.basedpyright]
|
||||
pythonVersion = '3.10'
|
||||
pythonPlatform = 'All'
|
||||
defineConstant = { PYDANTIC_V2 = true }
|
||||
typeCheckingMode = 'standard'
|
||||
|
||||
[tool.nonebot]
|
||||
plugins = ['nonebot_plugin_tetris_stats']
|
||||
# plugins = ['test_aps']
|
||||
|
||||
Reference in New Issue
Block a user