Compare commits
61 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7133cd9384 | |||
|
|
406bc7674e | ||
|
|
259b38fda5 | ||
|
|
414345ae5c | ||
|
|
341cbd86cd | ||
|
|
bf7804738e | ||
|
|
553f373671 | ||
| e53e164a52 | |||
| 2cd7d89c3e | |||
| b8b6d5f6c8 | |||
| 7a44c0dca5 | |||
| 4155d8eb42 | |||
| 4cc942d226 | |||
| 996dd565d8 | |||
| 5b0660e45b | |||
| 8d1ebc06d1 | |||
| c57aa48048 | |||
| ad90562fdf | |||
| cbc96fc09e | |||
| 8e10cfe0d0 | |||
| d192f0506d | |||
| 44aed656b8 | |||
| feb662b980 | |||
| ed6eb9a5cf | |||
| 25e281a4c5 | |||
| a2d69b9113 | |||
| c8907a47a4 | |||
| 9fb176b4bc | |||
|
|
53740265b6 | ||
|
|
e6119074ce | ||
| f7a2e89274 | |||
| 3fe5a19c4a | |||
| d35469cdef | |||
| 0cbae117aa | |||
| 25dc57d911 | |||
| 6042417b65 | |||
| 63cd94a0d7 | |||
| eb810d4bd2 | |||
| 52df4cf170 | |||
|
|
7a6615f6c9 | ||
|
|
c363908434 | ||
|
|
e26fb44106 | ||
|
|
7e2c04426a | ||
|
|
5910f05dfe | ||
| eebbd08551 | |||
| f035b844ab | |||
| 197b81f9cf | |||
| 5c2ffe13b0 | |||
| b0cff16dc6 | |||
|
|
3c952530d1 | ||
|
|
57dfc8b94a | ||
| 1065e62d11 | |||
| 02e703ea91 | |||
| 429f99f77e | |||
| 9a16e2fa21 | |||
| 0719d549b5 | |||
| 9cb2a90197 | |||
|
|
bb0606a144 | ||
|
|
41068f7152 | ||
|
|
6f98136c0f | ||
|
|
62335abaa6 |
@@ -2,12 +2,12 @@ default_install_hook_types: [pre-commit, prepare-commit-msg]
|
||||
ci:
|
||||
autofix_commit_msg: ':rotating_light: auto fix by pre-commit hooks'
|
||||
autofix_prs: true
|
||||
autoupdate_branch: master
|
||||
autoupdate_schedule: monthly
|
||||
autoupdate_branch: main
|
||||
autoupdate_schedule: weekly
|
||||
autoupdate_commit_msg: ':arrow_up: auto update by pre-commit hooks'
|
||||
repos:
|
||||
- repo: https://github.com/astral-sh/ruff-pre-commit
|
||||
rev: v0.4.10
|
||||
rev: v0.6.1
|
||||
hooks:
|
||||
- id: ruff
|
||||
args: [--fix, --exit-non-zero-on-fix]
|
||||
|
||||
@@ -1,13 +1,27 @@
|
||||
from pathlib import Path
|
||||
from nonebot import get_plugin_config
|
||||
from nonebot_plugin_localstore import get_cache_dir, get_data_dir
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from nonebot_plugin_localstore import get_cache_dir
|
||||
from pydantic import BaseModel
|
||||
CACHE_PATH = get_cache_dir('nonebot_plugin_tetris_stats')
|
||||
DATA_PATH = get_data_dir('nonebot_plugin_tetris_stats')
|
||||
|
||||
CACHE_PATH: Path = get_cache_dir('nonebot_plugin_tetris_stats')
|
||||
|
||||
class Proxy(BaseModel):
|
||||
main: str | None = None
|
||||
github: str | None = None
|
||||
tetrio: str | None = None
|
||||
tos: str | None = None
|
||||
top: str | None = None
|
||||
|
||||
|
||||
class ScopedConfig(BaseModel):
|
||||
request_timeout: float = 30.0
|
||||
screenshot_quality: float = 2
|
||||
proxy: Proxy = Field(default_factory=Proxy)
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
"""配置类"""
|
||||
tetris: ScopedConfig = Field(default_factory=ScopedConfig)
|
||||
|
||||
tetris_req_timeout: float = 30.0
|
||||
tetris_screenshot_quality: float = 2
|
||||
|
||||
config = get_plugin_config(Config)
|
||||
|
||||
@@ -19,7 +19,6 @@ from sqlalchemy import desc, select
|
||||
from sqlalchemy.dialects import sqlite
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from sqlalchemy.orm import Session
|
||||
from ujson import dumps, loads
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
@@ -31,6 +30,8 @@ depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def migrate_old_data() -> None:
|
||||
from json import dumps, loads
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
Base.prepare(autoload_with=op.get_bind())
|
||||
OldHistoricalData = Base.classes.nonebot_plugin_tetris_stats_historicaldata # noqa: N806
|
||||
|
||||
@@ -13,7 +13,6 @@ from typing import TYPE_CHECKING
|
||||
from alembic import op
|
||||
from sqlalchemy.ext.automap import automap_base
|
||||
from sqlalchemy.orm import Session
|
||||
from ujson import dumps, loads
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
@@ -27,6 +26,7 @@ depends_on: str | Sequence[str] | None = None
|
||||
def upgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
from json import dumps, loads
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
connection = op.get_bind()
|
||||
@@ -50,6 +50,7 @@ def upgrade(name: str = '') -> None:
|
||||
def downgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
from json import dumps, loads
|
||||
|
||||
Base = automap_base() # noqa: N806
|
||||
connection = op.get_bind()
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
"""Extend api_type field length
|
||||
|
||||
迁移 ID: cfeab6961dce
|
||||
父迁移: f5b4a6d1325b
|
||||
创建时间: 2024-08-09 14:20:59.789030
|
||||
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
from nonebot.log import logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from collections.abc import Sequence
|
||||
|
||||
revision: str = 'cfeab6961dce'
|
||||
down_revision: str | Sequence[str] | None = 'f5b4a6d1325b'
|
||||
branch_labels: str | Sequence[str] | None = None
|
||||
depends_on: str | Sequence[str] | None = None
|
||||
|
||||
|
||||
def upgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetriohistoricaldata', schema=None) as batch_op:
|
||||
batch_op.alter_column(
|
||||
'api_type', existing_type=sa.VARCHAR(length=16), type_=sa.String(length=32), existing_nullable=False
|
||||
)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade(name: str = '') -> None:
|
||||
if name:
|
||||
return
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
logger.warning('新数据可能不支持降级!')
|
||||
logger.warning('请确认数据库内数据可以迁移到旧版本!')
|
||||
input('如果确认可以迁移, 请按回车键继续!')
|
||||
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetriohistoricaldata', schema=None) as batch_op:
|
||||
batch_op.alter_column(
|
||||
'api_type', existing_type=sa.String(length=32), type_=sa.VARCHAR(length=16), existing_nullable=False
|
||||
)
|
||||
|
||||
# ### end Alembic commands ###
|
||||
@@ -68,11 +68,11 @@ T = TypeVar('T', 'TETRIOHistoricalData', 'TOPHistoricalData', 'TOSHistoricalData
|
||||
lock = Lock()
|
||||
|
||||
|
||||
async def anti_duplicate_add(cls: type[T], model: T) -> None:
|
||||
async def anti_duplicate_add(model: T) -> None:
|
||||
async with lock, get_session() as session:
|
||||
result = (
|
||||
await session.scalars(
|
||||
select(cls)
|
||||
select(cls := model.__class__)
|
||||
.where(cls.update_time == model.update_time)
|
||||
.where(cls.user_unique_identifier == model.user_unique_identifier)
|
||||
.where(cls.api_type == model.api_type)
|
||||
|
||||
@@ -1,20 +1,21 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Generic, TypeVar
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..utils.typing import GameType
|
||||
|
||||
|
||||
class Base(BaseModel):
|
||||
platform: GameType
|
||||
T = TypeVar('T', bound=GameType)
|
||||
|
||||
|
||||
class BaseUser(ABC, Base):
|
||||
class BaseUser(BaseModel, ABC, Generic[T]):
|
||||
"""游戏用户"""
|
||||
|
||||
def __eq__(self, __value: object) -> bool:
|
||||
if isinstance(__value, BaseUser):
|
||||
return self.unique_identifier == __value.unique_identifier
|
||||
platform: T
|
||||
|
||||
def __eq__(self, other: object) -> bool:
|
||||
if isinstance(other, BaseUser):
|
||||
return self.unique_identifier == other.unique_identifier
|
||||
return False
|
||||
|
||||
@property
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from .player import Player
|
||||
from .schemas.user import User
|
||||
from .schemas.user_info import UserInfoSuccess
|
||||
from .schemas.user_records import UserRecordsSuccess
|
||||
from .tetra_league import full_export as tetra_league_full_export
|
||||
|
||||
__all__ = ['Player', 'User', 'UserInfoSuccess', 'UserRecordsSuccess', 'tetra_league_full_export']
|
||||
__all__ = ['Player', 'User', 'UserInfoSuccess', 'tetra_league_full_export']
|
||||
|
||||
@@ -6,25 +6,29 @@ from weakref import WeakValueDictionary
|
||||
from aiocache import Cache as ACache # type: ignore[import-untyped]
|
||||
from nonebot.compat import type_validate_json
|
||||
from nonebot.log import logger
|
||||
from yarl import URL
|
||||
|
||||
from ....config.config import config
|
||||
from ....utils.request import Request
|
||||
from .schemas.base import FailedModel, SuccessModel
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
request = Request(config.tetris.proxy.tetrio or config.tetris.proxy.main)
|
||||
|
||||
|
||||
class Cache:
|
||||
cache = ACache(ACache.MEMORY)
|
||||
task: ClassVar[WeakValueDictionary[str, Lock]] = WeakValueDictionary()
|
||||
task: ClassVar[WeakValueDictionary[URL, Lock]] = WeakValueDictionary()
|
||||
|
||||
@classmethod
|
||||
async def get(cls, url: str) -> bytes:
|
||||
async def get(cls, url: URL) -> bytes:
|
||||
lock = cls.task.setdefault(url, Lock())
|
||||
async with lock:
|
||||
if (cached_data := await cls.cache.get(url)) is not None:
|
||||
logger.debug(f'{url}: Cache hit!')
|
||||
return cached_data
|
||||
response_data = await Request.request(url)
|
||||
response_data = await request.request(url)
|
||||
parsed_data: SuccessModel | FailedModel = type_validate_json(SuccessModel | FailedModel, response_data) # type: ignore[arg-type]
|
||||
if isinstance(parsed_data, SuccessModel):
|
||||
await cls.cache.add(
|
||||
|
||||
@@ -7,12 +7,12 @@ from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
|
||||
|
||||
from ....db.models import PydanticType
|
||||
from .schemas.base import SuccessModel
|
||||
from .typing import Summaries
|
||||
from .typing import Records, Summaries
|
||||
|
||||
|
||||
class TETRIOHistoricalData(MappedAsDataclass, Model):
|
||||
id: Mapped[int] = mapped_column(init=False, primary_key=True)
|
||||
user_unique_identifier: Mapped[str] = mapped_column(String(24), index=True)
|
||||
api_type: Mapped[Literal['User Info', Summaries]] = mapped_column(String(16), index=True)
|
||||
api_type: Mapped[Literal['User Info', Records, Summaries]] = mapped_column(String(32), index=True)
|
||||
data: Mapped[SuccessModel] = mapped_column(PydanticType(get_model=[SuccessModel.__subclasses__], models=set()))
|
||||
update_time: Mapped[datetime] = mapped_column(DateTime, index=True)
|
||||
|
||||
@@ -1,36 +1,61 @@
|
||||
from enum import Enum
|
||||
from types import MappingProxyType
|
||||
from typing import Literal, overload
|
||||
from typing import Literal, NamedTuple, cast, overload
|
||||
|
||||
from async_lru import alru_cache
|
||||
from nonebot.compat import type_validate_json
|
||||
|
||||
from ....db import anti_duplicate_add
|
||||
from ....utils.exception import RequestError
|
||||
from ....utils.request import splice_url
|
||||
from ..constant import BASE_URL, USER_ID, USER_NAME
|
||||
from .cache import Cache
|
||||
from .models import TETRIOHistoricalData
|
||||
from .schemas.base import FailedModel
|
||||
from .schemas.records.solo import Solo as SoloRecord
|
||||
from .schemas.records.solo import SoloSuccessModel as RecordsSoloSuccessModel
|
||||
from .schemas.summaries import (
|
||||
AchievementsSuccessModel,
|
||||
SoloSuccessModel,
|
||||
SummariesModel,
|
||||
ZenithSuccessModel,
|
||||
ZenSuccessModel,
|
||||
)
|
||||
from .schemas.summaries import (
|
||||
SoloSuccessModel as SummariesSoloSuccessModel,
|
||||
)
|
||||
from .schemas.summaries.base import User as SummariesUser
|
||||
from .schemas.summaries.league import LeagueSuccessModel
|
||||
from .schemas.user import User
|
||||
from .schemas.user_info import UserInfo, UserInfoSuccess
|
||||
from .typing import Summaries
|
||||
from .typing import Records, Summaries
|
||||
|
||||
|
||||
class RecordModeType(str, Enum):
|
||||
Sprint = '40l'
|
||||
Blitz = 'blitz'
|
||||
|
||||
|
||||
class RecordType(str, Enum):
|
||||
Top = 'top'
|
||||
Recent = 'recent'
|
||||
Progression = 'progression'
|
||||
|
||||
|
||||
class RecordKey(NamedTuple):
|
||||
mode_type: RecordModeType
|
||||
record_type: RecordType
|
||||
|
||||
def to_records(self) -> Records:
|
||||
return cast(Records, f'{self.mode_type.value}_{self.record_type.value}')
|
||||
|
||||
|
||||
class Player:
|
||||
__SUMMARIES_MAPPING: MappingProxyType[Summaries, type[SummariesModel]] = MappingProxyType(
|
||||
{
|
||||
'40l': SoloSuccessModel,
|
||||
'blitz': SoloSuccessModel,
|
||||
'40l': SummariesSoloSuccessModel,
|
||||
'blitz': SummariesSoloSuccessModel,
|
||||
'zenith': ZenithSuccessModel,
|
||||
'zenithex': ZenithSuccessModel,
|
||||
'league': LeagueSuccessModel,
|
||||
'zen': ZenSuccessModel,
|
||||
'achievements': AchievementsSuccessModel,
|
||||
}
|
||||
@@ -58,15 +83,11 @@ class Player:
|
||||
self.__user: User | None = None
|
||||
self._user_info: UserInfoSuccess | None = None
|
||||
self._summaries: dict[Summaries, SummariesModel] = {}
|
||||
self._records: dict[RecordKey, RecordsSoloSuccessModel] = {}
|
||||
|
||||
@property
|
||||
def _request_user_parameter(self) -> str:
|
||||
if self.user_id is not None:
|
||||
return self.user_id
|
||||
if self.user_name is not None:
|
||||
return self.user_name.lower()
|
||||
msg = 'Invalid user'
|
||||
raise ValueError(msg)
|
||||
return self.user_id or cast(str, self.user_name).lower()
|
||||
|
||||
@property
|
||||
async def user(self) -> User:
|
||||
@@ -90,14 +111,13 @@ class Player:
|
||||
async def get_info(self) -> UserInfoSuccess:
|
||||
"""Get User Info"""
|
||||
if self._user_info is None:
|
||||
raw_user_info = await Cache.get(splice_url([BASE_URL, 'users/', f'{self._request_user_parameter}']))
|
||||
raw_user_info = await Cache.get(BASE_URL / 'users' / self._request_user_parameter)
|
||||
user_info: UserInfo = type_validate_json(UserInfo, raw_user_info) # type: ignore[arg-type]
|
||||
if isinstance(user_info, FailedModel):
|
||||
msg = f'用户信息请求错误:\n{user_info.error}'
|
||||
raise RequestError(msg)
|
||||
self._user_info = user_info
|
||||
await anti_duplicate_add(
|
||||
TETRIOHistoricalData,
|
||||
TETRIOHistoricalData(
|
||||
user_unique_identifier=(await self.user).unique_identifier,
|
||||
api_type='User Info',
|
||||
@@ -108,18 +128,20 @@ class Player:
|
||||
return self._user_info
|
||||
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['40l', 'blitz']) -> SoloSuccessModel: ...
|
||||
async def get_summaries(self, summaries_type: Literal['40l', 'blitz']) -> SummariesSoloSuccessModel: ...
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['zenith', 'zenithex']) -> ZenithSuccessModel: ...
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['zen']) -> ZenSuccessModel: ...
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['league']) -> LeagueSuccessModel: ...
|
||||
@overload
|
||||
async def get_summaries(self, summaries_type: Literal['achievements']) -> AchievementsSuccessModel: ...
|
||||
|
||||
async def get_summaries(self, summaries_type: Summaries) -> SummariesModel:
|
||||
if summaries_type not in self._summaries:
|
||||
raw_summaries = await Cache.get(
|
||||
splice_url([BASE_URL, 'users/', f'{self._request_user_parameter}/', 'summaries/', summaries_type])
|
||||
BASE_URL / 'users' / self._request_user_parameter / 'summaries' / summaries_type
|
||||
)
|
||||
summaries: SummariesModel | FailedModel = type_validate_json(
|
||||
self.__SUMMARIES_MAPPING[summaries_type] | FailedModel, # type: ignore[arg-type]
|
||||
@@ -130,7 +152,6 @@ class Player:
|
||||
raise RequestError(msg)
|
||||
self._summaries[summaries_type] = summaries
|
||||
await anti_duplicate_add(
|
||||
TETRIOHistoricalData,
|
||||
TETRIOHistoricalData(
|
||||
user_unique_identifier=(await self.user).unique_identifier,
|
||||
api_type=summaries_type,
|
||||
@@ -141,20 +162,21 @@ class Player:
|
||||
return self._summaries[summaries_type]
|
||||
|
||||
@property
|
||||
@alru_cache
|
||||
async def sprint(self) -> SoloSuccessModel:
|
||||
async def sprint(self) -> SummariesSoloSuccessModel:
|
||||
return await self.get_summaries('40l')
|
||||
|
||||
@property
|
||||
@alru_cache
|
||||
async def blitz(self) -> SoloSuccessModel:
|
||||
async def blitz(self) -> SummariesSoloSuccessModel:
|
||||
return await self.get_summaries('blitz')
|
||||
|
||||
@property
|
||||
@alru_cache
|
||||
async def zen(self) -> ZenSuccessModel:
|
||||
return await self.get_summaries('zen')
|
||||
|
||||
@property
|
||||
async def league(self) -> LeagueSuccessModel:
|
||||
return await self.get_summaries('league')
|
||||
|
||||
async def _get_local_summaries_user(self) -> SummariesUser | None:
|
||||
allow_summaries: set[Literal['40l', 'blitz', 'zenith', 'zenithex']] = {
|
||||
'40l',
|
||||
@@ -185,3 +207,23 @@ class Player:
|
||||
if (user := (await self._get_local_summaries_user())) is not None:
|
||||
return user.banner_revision
|
||||
return (await self.get_info()).data.banner_revision
|
||||
|
||||
async def get_records(self, mode_type: RecordModeType, records_type: RecordType) -> RecordsSoloSuccessModel:
|
||||
if (record_key := RecordKey(mode_type, records_type)) not in self._records:
|
||||
raw_records = await Cache.get(
|
||||
BASE_URL / 'users' / self._request_user_parameter / 'records' / mode_type / records_type,
|
||||
)
|
||||
records: RecordsSoloSuccessModel | FailedModel = type_validate_json(SoloRecord, raw_records) # type: ignore[arg-type]
|
||||
if isinstance(records, FailedModel):
|
||||
msg = f'用户Summaries数据请求错误:\n{records.error}'
|
||||
raise RequestError(msg)
|
||||
self._records[record_key] = records
|
||||
await anti_duplicate_add(
|
||||
TETRIOHistoricalData(
|
||||
user_unique_identifier=(await self.user).unique_identifier,
|
||||
api_type=record_key.to_records(),
|
||||
data=records,
|
||||
update_time=records.cache.cached_at,
|
||||
),
|
||||
)
|
||||
return self._records[record_key]
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class P(BaseModel): # what is P
|
||||
pri: float
|
||||
sec: float
|
||||
ter: float
|
||||
|
||||
|
||||
class Cache(BaseModel):
|
||||
status: str
|
||||
cached_at: datetime
|
||||
cached_until: datetime
|
||||
|
||||
|
||||
class SuccessModel(BaseModel):
|
||||
success: Literal[True]
|
||||
cache: Cache
|
||||
|
||||
|
||||
class FailedModel(BaseModel):
|
||||
success: Literal[False]
|
||||
error: str
|
||||
@@ -0,0 +1,61 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class AggregateStats(BaseModel):
|
||||
apm: float
|
||||
pps: float
|
||||
vsscore: float
|
||||
|
||||
|
||||
class Finesse(BaseModel):
|
||||
combo: int
|
||||
faults: int
|
||||
perfectpieces: int
|
||||
|
||||
|
||||
class Clears(BaseModel):
|
||||
singles: int
|
||||
doubles: int
|
||||
triples: int
|
||||
quads: int
|
||||
realtspins: int
|
||||
minitspins: int
|
||||
minitspinsingles: int
|
||||
tspinsingles: int
|
||||
minitspindoubles: int
|
||||
tspindoubles: int
|
||||
tspintriples: int
|
||||
tspinquads: int
|
||||
allclear: int
|
||||
|
||||
|
||||
class Garbage(BaseModel):
|
||||
sent: int
|
||||
received: int
|
||||
attack: int | None
|
||||
cleared: int
|
||||
|
||||
|
||||
class P(BaseModel): # what is P
|
||||
pri: float
|
||||
sec: float
|
||||
ter: float
|
||||
|
||||
|
||||
class Cache(BaseModel):
|
||||
status: str
|
||||
cached_at: datetime
|
||||
cached_until: datetime
|
||||
|
||||
|
||||
class SuccessModel(BaseModel):
|
||||
success: Literal[True]
|
||||
cache: Cache
|
||||
|
||||
|
||||
class FailedModel(BaseModel):
|
||||
success: Literal[False]
|
||||
error: str
|
||||
@@ -0,0 +1,65 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ..base import P
|
||||
from . import AggregateStats, Clears, Finesse, Garbage
|
||||
|
||||
|
||||
class Time(BaseModel):
|
||||
start: int
|
||||
zero: bool
|
||||
locked: bool
|
||||
prev: int
|
||||
frameoffset: int
|
||||
|
||||
|
||||
class Stats(BaseModel):
|
||||
seed: int | None = None # ?: 不知道是之后都没有了还是还会有
|
||||
lines: int
|
||||
level_lines: int
|
||||
level_lines_needed: int
|
||||
inputs: int
|
||||
holds: int = 0
|
||||
time: Time | None = None # ?: 不知道是之后都没有了还是还会有
|
||||
score: int
|
||||
zenlevel: int
|
||||
zenprogress: int
|
||||
level: int
|
||||
combo: int
|
||||
currentcombopower: int | None = None
|
||||
topcombo: int
|
||||
btb: int
|
||||
topbtb: int
|
||||
currentbtbchainpower: int | None = None
|
||||
tspins: int
|
||||
piecesplaced: int
|
||||
clears: Clears
|
||||
garbage: Garbage
|
||||
kills: int
|
||||
finesse: Finesse
|
||||
finaltime: float
|
||||
|
||||
|
||||
class Results(BaseModel):
|
||||
aggregatestats: AggregateStats
|
||||
stats: Stats
|
||||
gameoverreason: str
|
||||
|
||||
|
||||
class Record(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
replayid: str
|
||||
stub: bool
|
||||
gamemode: Literal['40l', 'blitz']
|
||||
pb: bool
|
||||
oncepb: bool
|
||||
ts: datetime
|
||||
revolution: None
|
||||
otherusers: list
|
||||
leaderboards: list[str]
|
||||
results: Results
|
||||
extras: dict
|
||||
disputed: bool
|
||||
p: P
|
||||
@@ -0,0 +1,17 @@
|
||||
from typing import TypeAlias
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..base import FailedModel, SuccessModel
|
||||
from ..base.solo import Record
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
entries: list[Record]
|
||||
|
||||
|
||||
class SoloSuccessModel(SuccessModel):
|
||||
data: Data
|
||||
|
||||
|
||||
Solo: TypeAlias = SoloSuccessModel | FailedModel
|
||||
@@ -1,20 +1,21 @@
|
||||
from .achievements import Achievements, AchievementsSuccessModel
|
||||
from .solo import Blitz, SoloSuccessModel, Sprint
|
||||
from .league import LeagueSuccessModel
|
||||
from .solo import Solo, SoloSuccessModel
|
||||
from .zen import Zen, ZenSuccessModel
|
||||
from .zenith import Zenith, ZenithEx, ZenithSuccessModel
|
||||
|
||||
SummariesModel = AchievementsSuccessModel | SoloSuccessModel | ZenSuccessModel | ZenithSuccessModel
|
||||
SummariesModel = AchievementsSuccessModel | SoloSuccessModel | ZenSuccessModel | LeagueSuccessModel | ZenithSuccessModel
|
||||
|
||||
__all__ = [
|
||||
'Achievements',
|
||||
'AchievementsSuccessModel',
|
||||
'Blitz',
|
||||
'Sprint',
|
||||
'LeagueSuccessModel',
|
||||
'Solo',
|
||||
'SoloSuccessModel',
|
||||
'SummariesModel',
|
||||
'Zen',
|
||||
'ZenSuccessModel',
|
||||
'Zenith',
|
||||
'ZenithEx',
|
||||
'ZenithSuccessModel',
|
||||
'SummariesModel',
|
||||
'ZenSuccessModel',
|
||||
]
|
||||
|
||||
@@ -7,17 +7,5 @@ class User(BaseModel):
|
||||
avatar_revision: int | None
|
||||
banner_revision: int | None
|
||||
country: str | None
|
||||
verified: int
|
||||
verified: int | None = None
|
||||
supporter: int
|
||||
|
||||
|
||||
class AggregateStats(BaseModel):
|
||||
apm: float
|
||||
pps: float
|
||||
vsscore: float
|
||||
|
||||
|
||||
class Finesse(BaseModel):
|
||||
combo: int
|
||||
faults: int
|
||||
perfectpieces: int
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ...typing import Rank, S1Rank, S1ValidRank
|
||||
from ..base import SuccessModel
|
||||
|
||||
|
||||
class PastInner(BaseModel):
|
||||
season: str
|
||||
username: str
|
||||
country: str | None = None
|
||||
placement: int | None = None
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
glicko: float
|
||||
gxe: float
|
||||
tr: float
|
||||
rd: float
|
||||
rank: S1Rank
|
||||
bestrank: S1ValidRank
|
||||
ranked: bool
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
|
||||
|
||||
class Past(BaseModel):
|
||||
first: PastInner | None = Field(default=None, alias='1')
|
||||
|
||||
|
||||
class BaseData(BaseModel):
|
||||
decaying: bool
|
||||
past: Past
|
||||
|
||||
|
||||
class NeverPlayedData(BaseData):
|
||||
gamesplayed: Literal[0]
|
||||
gameswon: Literal[0]
|
||||
glicko: Literal[-1]
|
||||
rd: Literal[-1]
|
||||
gxe: Literal[-1]
|
||||
tr: Literal[-1]
|
||||
rank: Literal['z']
|
||||
apm: None = None
|
||||
pps: None = None
|
||||
vs: None = None
|
||||
standing: Literal[-1]
|
||||
standing_local: Literal[-1]
|
||||
prev_rank: None
|
||||
prev_at: Literal[-1]
|
||||
next_rank: None
|
||||
next_at: Literal[-1]
|
||||
percentile: Literal[-1]
|
||||
percentile_rank: Literal['z']
|
||||
|
||||
|
||||
class NeverRatedData(BaseData):
|
||||
gamesplayed: Literal[1, 2, 3, 4, 5, 6, 7, 8, 9]
|
||||
gameswon: int
|
||||
glicko: Literal[-1]
|
||||
rd: Literal[-1]
|
||||
gxe: Literal[-1]
|
||||
tr: Literal[-1]
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
rank: Literal['z']
|
||||
standing: Literal[-1]
|
||||
standing_local: Literal[-1]
|
||||
prev_rank: None
|
||||
prev_at: Literal[-1]
|
||||
next_rank: None
|
||||
next_at: Literal[-1]
|
||||
percentile: Literal[-1]
|
||||
percentile_rank: Literal['z']
|
||||
|
||||
|
||||
class RatedData(BaseData):
|
||||
gamesplayed: int
|
||||
gameswon: int
|
||||
glicko: float
|
||||
rd: float
|
||||
gxe: float
|
||||
tr: float
|
||||
rank: Rank
|
||||
bestrank: Rank
|
||||
standing: int
|
||||
apm: float
|
||||
pps: float
|
||||
vs: float
|
||||
standing_local: int
|
||||
prev_rank: Rank | None = None
|
||||
prev_at: int
|
||||
next_rank: Rank | None = None
|
||||
next_at: int
|
||||
percentile: float
|
||||
percentile_rank: str
|
||||
|
||||
|
||||
class LeagueSuccessModel(SuccessModel):
|
||||
data: NeverPlayedData | NeverRatedData | RatedData
|
||||
@@ -1,92 +1,14 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal, TypeAlias
|
||||
from typing import TypeAlias
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ..base import FailedModel, P, SuccessModel
|
||||
from .base import AggregateStats, Finesse, User
|
||||
from ..base import FailedModel, SuccessModel
|
||||
from ..base.solo import Record as BaseRecord
|
||||
from .base import User
|
||||
|
||||
|
||||
class Time(BaseModel):
|
||||
start: int
|
||||
zero: bool
|
||||
locked: bool
|
||||
prev: int
|
||||
frameoffset: int
|
||||
|
||||
|
||||
class Clears(BaseModel):
|
||||
singles: int
|
||||
doubles: int
|
||||
triples: int
|
||||
quads: int
|
||||
realtspins: int
|
||||
minitspins: int
|
||||
minitspinsingles: int
|
||||
tspinsingles: int
|
||||
minitspindoubles: int
|
||||
tspindoubles: int
|
||||
tspintriples: int
|
||||
tspinquads: int
|
||||
allclear: int
|
||||
|
||||
|
||||
class Garbage(BaseModel):
|
||||
sent: int
|
||||
received: int
|
||||
attack: int | None
|
||||
cleared: int
|
||||
|
||||
|
||||
class Stats(BaseModel):
|
||||
seed: int | None = None # ?: 不知道是之后都没有了还是还会有
|
||||
lines: int
|
||||
level_lines: int
|
||||
level_lines_needed: int
|
||||
inputs: int
|
||||
holds: int
|
||||
time: Time | None = None # ?: 不知道是之后都没有了还是还会有
|
||||
score: int
|
||||
zenlevel: int
|
||||
zenprogress: int
|
||||
level: int
|
||||
combo: int
|
||||
currentcombopower: int | None = None
|
||||
topcombo: int
|
||||
btb: int
|
||||
topbtb: int
|
||||
currentbtbchainpower: int | None = None
|
||||
tspins: int
|
||||
piecesplaced: int
|
||||
clears: Clears
|
||||
garbage: Garbage
|
||||
kills: int
|
||||
finesse: Finesse
|
||||
finaltime: float
|
||||
|
||||
|
||||
class Results(BaseModel):
|
||||
aggregatestats: AggregateStats
|
||||
stats: Stats
|
||||
gameoverreason: str
|
||||
|
||||
|
||||
class Record(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
replayid: str
|
||||
stub: bool
|
||||
gamemode: Literal['40l', 'blitz']
|
||||
pb: bool
|
||||
oncepb: bool
|
||||
ts: datetime
|
||||
revolution: None
|
||||
class Record(BaseRecord):
|
||||
user: User
|
||||
otherusers: list
|
||||
leaderboards: list[str]
|
||||
results: Results
|
||||
extras: dict
|
||||
disputed: bool
|
||||
p: P
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
@@ -99,5 +21,4 @@ class SoloSuccessModel(SuccessModel):
|
||||
data: Data
|
||||
|
||||
|
||||
Sprint: TypeAlias = SoloSuccessModel | FailedModel
|
||||
Blitz: TypeAlias = SoloSuccessModel | FailedModel
|
||||
Solo: TypeAlias = SoloSuccessModel | FailedModel
|
||||
|
||||
@@ -3,38 +3,23 @@ from typing import Literal, TypeAlias
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from ..base import FailedModel, P, SuccessModel
|
||||
from .base import AggregateStats, Finesse, User
|
||||
from ..base import AggregateStats, FailedModel, Finesse, P, SuccessModel
|
||||
from ..base import Clears as BaseClears
|
||||
from ..base import Garbage as BaseGarbage
|
||||
from .base import User
|
||||
|
||||
|
||||
class Clears(BaseModel):
|
||||
singles: int
|
||||
doubles: int
|
||||
triples: int
|
||||
quads: int
|
||||
class Clears(BaseClears):
|
||||
pentas: int
|
||||
realtspins: int
|
||||
minitspins: int
|
||||
minitspinsingles: int
|
||||
tspinsingles: int
|
||||
minitspindoubles: int
|
||||
tspindoubles: int
|
||||
minitspintriples: int
|
||||
tspintriples: int
|
||||
minitspinquads: int
|
||||
tspinquads: int
|
||||
tspinpentas: int
|
||||
allclear: int
|
||||
|
||||
|
||||
class Garbage(BaseModel):
|
||||
sent: int
|
||||
class Garbage(BaseGarbage):
|
||||
sent_nomult: int
|
||||
maxspike: int
|
||||
maxspike_nomult: int
|
||||
received: int
|
||||
attack: int
|
||||
cleared: int
|
||||
|
||||
|
||||
class _Zenith(BaseModel):
|
||||
|
||||
@@ -6,7 +6,7 @@ from ....schemas import BaseUser
|
||||
from ...constant import GAME_TYPE
|
||||
|
||||
|
||||
class User(BaseUser):
|
||||
class User(BaseUser[Literal['IO']]):
|
||||
platform: Literal['IO'] = GAME_TYPE
|
||||
|
||||
ID: str
|
||||
|
||||
@@ -42,7 +42,7 @@ class Data(BaseModel):
|
||||
badstanding: bool | None = None
|
||||
supporter: bool | None = None # osk说是必有, 但实际上不是 fkosk
|
||||
supporter_tier: int
|
||||
verified: bool
|
||||
verified: bool | None = None
|
||||
avatar_revision: int | None = None
|
||||
"""This user's avatar ID. Get their avatar at
|
||||
|
||||
|
||||
@@ -1,122 +0,0 @@
|
||||
from datetime import datetime
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from .....utils.typing import Number
|
||||
from .base import FailedModel
|
||||
from .base import SuccessModel as BaseSuccessModel
|
||||
|
||||
|
||||
class Time(BaseModel):
|
||||
start: int
|
||||
zero: bool
|
||||
locked: bool
|
||||
prev: int
|
||||
frameoffset: int | None = None
|
||||
|
||||
|
||||
class Clears(BaseModel):
|
||||
singles: int
|
||||
doubles: int
|
||||
triples: int
|
||||
quads: int
|
||||
pentas: int | None = None
|
||||
realtspins: int
|
||||
minitspins: int
|
||||
minitspinsingles: int
|
||||
tspinsingles: int
|
||||
minitspindoubles: int
|
||||
tspindoubles: int
|
||||
tspintriples: int
|
||||
tspinquads: int
|
||||
allclear: int
|
||||
|
||||
|
||||
class Garbage(BaseModel):
|
||||
sent: int
|
||||
received: int
|
||||
attack: int | None = None
|
||||
cleared: int | None = None
|
||||
|
||||
|
||||
class Finesse(BaseModel):
|
||||
combo: int
|
||||
faults: int
|
||||
perfectpieces: int
|
||||
|
||||
|
||||
class EndContext(BaseModel):
|
||||
seed: Number
|
||||
lines: int
|
||||
level_lines: int
|
||||
level_lines_needed: int
|
||||
inputs: int
|
||||
holds: int | None = None
|
||||
time: Time
|
||||
score: int
|
||||
zenlevel: int | None = None
|
||||
zenprogress: int | None = None
|
||||
level: int
|
||||
combo: int
|
||||
currentcombopower: int | None = None # WTF
|
||||
topcombo: int
|
||||
btb: int
|
||||
topbtb: int
|
||||
currentbtbchainpower: int | None = None # WTF * 2
|
||||
tspins: int
|
||||
piecesplaced: int
|
||||
clears: Clears
|
||||
garbage: Garbage
|
||||
kills: int
|
||||
finesse: Finesse
|
||||
final_time: float = Field(..., alias='finalTime')
|
||||
gametype: str
|
||||
|
||||
|
||||
class _User(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
username: str
|
||||
|
||||
|
||||
class _Record(BaseModel):
|
||||
id: str = Field(..., alias='_id')
|
||||
stream: str
|
||||
replayid: str
|
||||
user: _User
|
||||
ts: datetime
|
||||
ismulti: bool | None = None
|
||||
|
||||
|
||||
class SoloRecord(_Record):
|
||||
endcontext: EndContext
|
||||
|
||||
|
||||
class MultiRecord(_Record):
|
||||
endcontext: list[EndContext]
|
||||
|
||||
|
||||
class SoloModeRecord(BaseModel):
|
||||
record: SoloRecord | None = None
|
||||
rank: int | None = None
|
||||
|
||||
|
||||
class Records(BaseModel):
|
||||
sprint: SoloModeRecord = Field(..., alias='40l')
|
||||
blitz: SoloModeRecord
|
||||
|
||||
|
||||
class Zen(BaseModel):
|
||||
level: int
|
||||
score: int
|
||||
|
||||
|
||||
class Data(BaseModel):
|
||||
records: Records
|
||||
zen: Zen
|
||||
|
||||
|
||||
class UserRecordsSuccess(BaseSuccessModel):
|
||||
data: Data
|
||||
|
||||
|
||||
UserRecords = UserRecordsSuccess | FailedModel
|
||||
@@ -1,27 +1,26 @@
|
||||
from typing import Literal, NamedTuple, TypedDict, overload
|
||||
from urllib.parse import urlencode
|
||||
from typing import Literal, NamedTuple, overload
|
||||
|
||||
from msgspec import Struct, to_builtins
|
||||
from nonebot.compat import type_validate_json
|
||||
|
||||
from ....utils.exception import RequestError
|
||||
from ....utils.request import splice_url
|
||||
from ..constant import BASE_URL
|
||||
from .cache import Cache
|
||||
from .schemas.base import FailedModel
|
||||
from .schemas.tetra_league import TetraLeague, TetraLeagueSuccess
|
||||
|
||||
|
||||
class Parameter(TypedDict, total=False):
|
||||
after: float
|
||||
before: float
|
||||
limit: int
|
||||
country: str
|
||||
class Parameter(Struct, omit_defaults=True):
|
||||
after: float | None = None
|
||||
before: float | None = None
|
||||
limit: int | None = None
|
||||
country: str | None = None
|
||||
|
||||
|
||||
async def leaderboard(parameter: Parameter | None = None) -> TetraLeagueSuccess:
|
||||
league: TetraLeague = type_validate_json(
|
||||
TetraLeague, # type: ignore[arg-type]
|
||||
(await Cache.get(splice_url([BASE_URL, 'users/lists/league', f'?{urlencode(parameter or {})}']))),
|
||||
(await Cache.get(BASE_URL / 'users/lists/league' % to_builtins(parameter))),
|
||||
)
|
||||
if isinstance(league, FailedModel):
|
||||
msg = f'排行榜数据请求错误:\n{league.error}'
|
||||
@@ -45,8 +44,9 @@ async def full_export(*, with_original: Literal[True]) -> FullExport: ...
|
||||
async def full_export(*, with_original: bool) -> TetraLeagueSuccess | FullExport:
|
||||
full: TetraLeague = type_validate_json(
|
||||
TetraLeague, # type: ignore[arg-type]
|
||||
(data := await Cache.get(splice_url([BASE_URL, 'users/lists/league/all']))),
|
||||
(data := await Cache.get(BASE_URL / 'users/lists/league/all')),
|
||||
)
|
||||
|
||||
if isinstance(full, FailedModel):
|
||||
msg = f'排行榜数据请求错误:\n{full.error}'
|
||||
raise RequestError(msg)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from typing import Literal
|
||||
|
||||
ValidRank = Literal[
|
||||
S1ValidRank = Literal[
|
||||
'x+',
|
||||
'x',
|
||||
'u',
|
||||
'ss',
|
||||
@@ -19,7 +20,9 @@ ValidRank = Literal[
|
||||
'd+',
|
||||
'd',
|
||||
]
|
||||
S1Rank = S1ValidRank | Literal['z']
|
||||
|
||||
ValidRank = Literal['x+'] | S1ValidRank
|
||||
Rank = ValidRank | Literal['z'] # 未定级
|
||||
|
||||
Summaries = Literal[
|
||||
@@ -27,7 +30,16 @@ Summaries = Literal[
|
||||
'blitz',
|
||||
'zenith',
|
||||
'zenithex',
|
||||
# 'league', # 等待正式赛季开始
|
||||
'league',
|
||||
'zen',
|
||||
'achievements',
|
||||
]
|
||||
|
||||
Records = Literal[
|
||||
'40l_top',
|
||||
'40l_recent',
|
||||
'40l_progression',
|
||||
'blitz_top',
|
||||
'blitz_recent',
|
||||
'blitz_progression',
|
||||
]
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
from hashlib import md5
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from arclet.alconna import Arg, ArgFlag
|
||||
from nonebot_plugin_alconna import Args, Subcommand
|
||||
@@ -9,6 +8,7 @@ from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import User
|
||||
from nonebot_plugin_userinfo import BotUserInfo, UserInfo
|
||||
from yarl import URL
|
||||
|
||||
from ...db import BindStatus, create_or_update_bind, trigger
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
@@ -67,7 +67,10 @@ async def _(nb_user: User, account: Player, event_session: EventSession, bot_inf
|
||||
platform='TETR.IO',
|
||||
status='unknown',
|
||||
user=People(
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}')
|
||||
% {'revision': avatar_revision}
|
||||
)
|
||||
if (avatar_revision := (await account.avatar_revision)) is not None and avatar_revision != 0
|
||||
else Avatar(type='identicon', hash=md5(user.ID.encode()).hexdigest()), # noqa: S324
|
||||
name=user.name.upper(),
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
from re import compile
|
||||
from typing import Literal
|
||||
|
||||
from yarl import URL
|
||||
|
||||
from .api.typing import ValidRank
|
||||
|
||||
GAME_TYPE: Literal['IO'] = 'IO'
|
||||
|
||||
BASE_URL = 'https://ch.tetr.io/api/'
|
||||
BASE_URL = URL('https://ch.tetr.io/api/')
|
||||
|
||||
RANK_PERCENTILE: dict[ValidRank, float] = {
|
||||
'x': 1,
|
||||
|
||||
@@ -2,7 +2,6 @@ from asyncio import gather
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from hashlib import md5
|
||||
from typing import TYPE_CHECKING, TypeVar
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from arclet.alconna import Arg, ArgFlag
|
||||
from nonebot import get_driver
|
||||
@@ -16,12 +15,22 @@ from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[im
|
||||
from nonebot_plugin_user import User as NBUser
|
||||
from nonebot_plugin_user import get_user
|
||||
from sqlalchemy import select
|
||||
from yarl import URL
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.metrics import get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.schemas.base import Avatar
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import Badge, Blitz, Sprint, Statistic, Zen
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import (
|
||||
Badge,
|
||||
Blitz,
|
||||
Sprint,
|
||||
Statistic,
|
||||
TetraLeague,
|
||||
TetraLeagueStatistic,
|
||||
Zen,
|
||||
)
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import Info as V2TemplateInfo
|
||||
from ...utils.render.schemas.tetrio.user.info_v2 import User as V2TemplateUser
|
||||
from ...utils.screenshot import screenshot
|
||||
@@ -30,6 +39,7 @@ from .. import add_block_handlers, alc
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import command, get_player
|
||||
from .api import Player
|
||||
from .api.schemas.summaries.league import LeagueSuccessModel, NeverPlayedData, NeverRatedData
|
||||
from .constant import GAME_TYPE
|
||||
from .models import TETRIOUserConfig
|
||||
from .typing import Template
|
||||
@@ -146,15 +156,17 @@ def handling_special_value(value: N) -> N | None:
|
||||
async def make_query_image_v2(player: Player) -> bytes:
|
||||
user: User
|
||||
user_info: UserInfoSuccess
|
||||
league: LeagueSuccessModel
|
||||
sprint: SoloSuccessModel
|
||||
blitz: SoloSuccessModel
|
||||
zen: ZenSuccessModel
|
||||
avatar_revision: int | None
|
||||
banner_revision: int | None
|
||||
# [todo) 有没有什么办法能让这类型推导成功)
|
||||
user, user_info, sprint, blitz, zen, avatar_revision, banner_revision = await gather( # type: ignore[assignment]
|
||||
# TODO)) 有没有什么办法能让这类型推导成功)
|
||||
user, user_info, league, sprint, blitz, zen, avatar_revision, banner_revision = await gather( # type: ignore[assignment]
|
||||
player.user,
|
||||
player.get_info(),
|
||||
player.league,
|
||||
player.sprint,
|
||||
player.blitz,
|
||||
player.zen,
|
||||
@@ -187,10 +199,14 @@ async def make_query_image_v2(player: Player) -> bytes:
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
bio=user_info.data.bio,
|
||||
banner=f'http://{netloc}/host/resource/tetrio/banners/{user.ID}?{urlencode({"revision": banner_revision})}'
|
||||
banner=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/banners/{user.ID}') % {'revision': banner_revision}
|
||||
)
|
||||
if banner_revision is not None and banner_revision != 0
|
||||
else None,
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}') % {'revision': avatar_revision}
|
||||
)
|
||||
if avatar_revision is not None and avatar_revision != 0
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
@@ -211,11 +227,29 @@ async def make_query_image_v2(player: Player) -> bytes:
|
||||
friend_count=user_info.data.friend_count,
|
||||
supporter_tier=user_info.data.supporter_tier,
|
||||
bad_standing=user_info.data.badstanding or False,
|
||||
verified=user_info.data.verified,
|
||||
verified=user_info.data.verified or False,
|
||||
playtime=play_time,
|
||||
join_at=user_info.data.ts,
|
||||
),
|
||||
tetra_league=None,
|
||||
tetra_league=TetraLeague(
|
||||
rank=league.data.rank,
|
||||
highest_rank='z' if isinstance(league.data, NeverRatedData) else league.data.bestrank,
|
||||
tr=round(league.data.tr, 2),
|
||||
glicko=round(league.data.glicko, 2),
|
||||
rd=round(league.data.rd, 2),
|
||||
global_rank=league.data.standing,
|
||||
country_rank=league.data.standing_local,
|
||||
pps=(metrics := get_metrics(pps=league.data.pps, apm=league.data.apm, vs=league.data.vs)).pps,
|
||||
apm=metrics.apm,
|
||||
apl=metrics.apl,
|
||||
vs=metrics.vs,
|
||||
adpl=metrics.adpl,
|
||||
statistic=TetraLeagueStatistic(total=league.data.gamesplayed, wins=league.data.gameswon),
|
||||
decaying=league.data.decaying,
|
||||
history=None,
|
||||
)
|
||||
if not isinstance(league.data, NeverPlayedData)
|
||||
else None,
|
||||
statistic=Statistic(
|
||||
total=handling_special_value(user_info.data.gamesplayed),
|
||||
wins=handling_special_value(user_info.data.gameswon),
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from asyncio import gather
|
||||
from datetime import timedelta
|
||||
from hashlib import md5
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.matcher import Matcher
|
||||
@@ -11,6 +10,7 @@ from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import get_user
|
||||
from yarl import URL
|
||||
|
||||
from ....db import query_bind_info, trigger
|
||||
from ....utils.exception import RecordNotFoundError
|
||||
@@ -94,7 +94,9 @@ async def make_blitz_image(player: Player) -> bytes:
|
||||
user=User(
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}') % {'revision': avatar_revision}
|
||||
)
|
||||
if (avatar_revision := (await player.avatar_revision)) is not None and avatar_revision != 0
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from asyncio import gather
|
||||
from datetime import timedelta
|
||||
from hashlib import md5
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from nonebot.adapters import Event
|
||||
from nonebot.matcher import Matcher
|
||||
@@ -11,6 +10,7 @@ from nonebot_plugin_orm import get_session
|
||||
from nonebot_plugin_session import EventSession
|
||||
from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped]
|
||||
from nonebot_plugin_user import get_user
|
||||
from yarl import URL
|
||||
|
||||
from ....db import query_bind_info, trigger
|
||||
from ....utils.exception import RecordNotFoundError
|
||||
@@ -95,7 +95,9 @@ async def make_sprint_image(player: Player) -> bytes:
|
||||
user=User(
|
||||
id=user.ID,
|
||||
name=user.name.upper(),
|
||||
avatar=f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}?{urlencode({"revision": avatar_revision})}'
|
||||
avatar=str(
|
||||
URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}') % {'revision': avatar_revision}
|
||||
)
|
||||
if (avatar_revision := (await player.avatar_revision)) is not None and avatar_revision != 0
|
||||
else Avatar(
|
||||
type='identicon',
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
from contextlib import suppress
|
||||
from datetime import datetime, timezone
|
||||
from io import StringIO
|
||||
from urllib.parse import urlencode
|
||||
|
||||
from lxml import etree
|
||||
from pandas import read_html
|
||||
|
||||
from ....config.config import config
|
||||
from ....db import anti_duplicate_add
|
||||
from ....utils.request import Request, splice_url
|
||||
from ....utils.request import Request
|
||||
from ..constant import BASE_URL, USER_NAME
|
||||
from .models import TOPHistoricalData
|
||||
from .schemas.user import User
|
||||
@@ -15,6 +15,8 @@ from .schemas.user_profile import Data, UserProfile
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
request = Request(config.tetris.proxy.top or config.tetris.proxy.main)
|
||||
|
||||
|
||||
class Player:
|
||||
def __init__(self, *, user_name: str, trust: bool = False) -> None:
|
||||
@@ -35,11 +37,9 @@ class Player:
|
||||
async def get_profile(self) -> UserProfile:
|
||||
"""获取用户信息"""
|
||||
if self._user_profile is None:
|
||||
url = splice_url([BASE_URL, 'profile.php', f'?{urlencode({"user":self.user_name})}'])
|
||||
raw_user_profile = await Request.request(url, is_json=False)
|
||||
raw_user_profile = await request.request(BASE_URL / 'profile.php' % {'user': self.user_name}, is_json=False)
|
||||
self._user_profile = self._parse_profile(raw_user_profile)
|
||||
await anti_duplicate_add(
|
||||
TOPHistoricalData,
|
||||
TOPHistoricalData(
|
||||
user_unique_identifier=(await self.user).unique_identifier,
|
||||
api_type='User Profile',
|
||||
@@ -49,7 +49,8 @@ class Player:
|
||||
)
|
||||
return self._user_profile
|
||||
|
||||
def _parse_profile(self, original_user_profile: bytes) -> UserProfile:
|
||||
@staticmethod
|
||||
def _parse_profile(original_user_profile: bytes) -> UserProfile:
|
||||
html = etree.HTML(original_user_profile)
|
||||
user_name = html.xpath('//div[@class="mycontent"]/h1/text()')[0].replace("'s profile", '')
|
||||
today = None
|
||||
@@ -68,4 +69,4 @@ class Player:
|
||||
total: list[Data] = []
|
||||
for _, value in dataframe.iterrows():
|
||||
total.append(Data(lpm=value['lpm'], apm=value['apm']))
|
||||
return UserProfile(user_name=user_name, today=today, total=total)
|
||||
return UserProfile(user_name=user_name, today=today, total=total or None)
|
||||
|
||||
@@ -6,7 +6,7 @@ from ....schemas import BaseUser
|
||||
from ...constant import GAME_TYPE
|
||||
|
||||
|
||||
class User(BaseUser):
|
||||
class User(BaseUser[Literal['TOP']]):
|
||||
platform: Literal['TOP'] = GAME_TYPE
|
||||
|
||||
user_name: str
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
from re import compile
|
||||
from typing import Literal
|
||||
|
||||
from yarl import URL
|
||||
|
||||
GAME_TYPE: Literal['TOP'] = 'TOP'
|
||||
|
||||
BASE_URL = 'http://tetrisonline.pl/top/'
|
||||
BASE_URL = URL('http://tetrisonline.pl/top/')
|
||||
|
||||
USER_NAME = compile(r'^[a-zA-Z0-9_]{1,16}$')
|
||||
|
||||
@@ -8,12 +8,20 @@ from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[im
|
||||
from nonebot_plugin_user import get_user
|
||||
|
||||
from ...db import query_bind_info, trigger
|
||||
from ...utils.metrics import get_metrics
|
||||
from ...utils.exception import FallbackError
|
||||
from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.metrics import TetrisMetricsBasicWithLPM, get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.avatar import get_avatar
|
||||
from ...utils.render.schemas.base import People
|
||||
from ...utils.render.schemas.top_info import Data as InfoData
|
||||
from ...utils.render.schemas.top_info import Info
|
||||
from ...utils.screenshot import screenshot
|
||||
from ...utils.typing import Me
|
||||
from ..constant import CANT_VERIFY_MESSAGE
|
||||
from . import alc
|
||||
from .api import Player
|
||||
from .api.schemas.user_profile import UserProfile
|
||||
from .api.schemas.user_profile import Data, UserProfile
|
||||
from .constant import GAME_TYPE
|
||||
|
||||
|
||||
@@ -35,8 +43,10 @@ async def _(event: Event, matcher: Matcher, target: At | Me, event_session: Even
|
||||
)
|
||||
if bind is None:
|
||||
await matcher.finish('未查询到绑定信息')
|
||||
message = CANT_VERIFY_MESSAGE
|
||||
await (message + make_query_text(await Player(user_name=bind.game_account, trust=True).get_profile())).finish()
|
||||
await (
|
||||
UniMessage(CANT_VERIFY_MESSAGE)
|
||||
+ await make_query_result(await Player(user_name=bind.game_account, trust=True).get_profile())
|
||||
).finish()
|
||||
|
||||
|
||||
@alc.assign('TOP.query')
|
||||
@@ -47,7 +57,34 @@ async def _(account: Player, event_session: EventSession):
|
||||
command_type='query',
|
||||
command_args=[],
|
||||
):
|
||||
await (make_query_text(await account.get_profile())).finish()
|
||||
await (await make_query_result(await account.get_profile())).finish()
|
||||
|
||||
|
||||
def get_avg_metrics(data: list[Data]) -> TetrisMetricsBasicWithLPM:
|
||||
total_lpm = total_apm = 0.0
|
||||
for value in data:
|
||||
total_lpm += value.lpm
|
||||
total_apm += value.apm
|
||||
num = len(data)
|
||||
return get_metrics(lpm=total_lpm / num, apm=total_apm / num)
|
||||
|
||||
|
||||
async def make_query_image(profile: UserProfile) -> bytes:
|
||||
if profile.today is None or profile.total is None:
|
||||
raise FallbackError
|
||||
today = get_metrics(lpm=profile.today.lpm, apm=profile.today.apm)
|
||||
history = get_avg_metrics(profile.total)
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v1/top/info',
|
||||
Info(
|
||||
user=People(avatar=get_avatar(profile.user_name), name=profile.user_name),
|
||||
today=InfoData(pps=today.pps, lpm=today.lpm, apm=today.apm, apl=today.apl),
|
||||
history=InfoData(pps=history.pps, lpm=history.lpm, apm=history.apm, apl=history.apl),
|
||||
),
|
||||
)
|
||||
) as page_hash:
|
||||
return await screenshot(f'http://{get_self_netloc()}/host/{page_hash}.html')
|
||||
|
||||
|
||||
def make_query_text(profile: UserProfile) -> UniMessage:
|
||||
@@ -60,15 +97,18 @@ def make_query_text(profile: UserProfile) -> UniMessage:
|
||||
else:
|
||||
message += f'用户 {profile.user_name} 暂无24小时内统计数据'
|
||||
if profile.total is not None:
|
||||
total_lpm = total_apm = 0.0
|
||||
for value in profile.total:
|
||||
total_lpm += value.lpm
|
||||
total_apm += value.apm
|
||||
num = len(profile.total)
|
||||
total = get_metrics(lpm=total_lpm / num, apm=total_apm / num)
|
||||
total = get_avg_metrics(profile.total)
|
||||
message += '\n历史统计数据为: '
|
||||
message += f"\nL'PM: {total.lpm} ( {total.pps} pps )"
|
||||
message += f'\nAPM: {total.apm} ( x{total.apl} )'
|
||||
else:
|
||||
message += '\n暂无历史统计数据'
|
||||
return UniMessage(message)
|
||||
|
||||
|
||||
async def make_query_result(profile: UserProfile) -> UniMessage:
|
||||
try:
|
||||
return UniMessage.image(raw=await make_query_image(profile))
|
||||
except FallbackError:
|
||||
...
|
||||
return make_query_text(profile)
|
||||
|
||||
@@ -1,13 +1,14 @@
|
||||
from datetime import datetime, timezone
|
||||
from typing import overload
|
||||
from urllib.parse import urlencode
|
||||
from typing import cast, overload
|
||||
|
||||
from httpx import TimeoutException
|
||||
from nonebot.compat import type_validate_json
|
||||
from yarl import URL
|
||||
|
||||
from ....config.config import config
|
||||
from ....db import anti_duplicate_add
|
||||
from ....utils.exception import RequestError
|
||||
from ....utils.request import Request, splice_url
|
||||
from ....utils.request import Request
|
||||
from ..constant import BASE_URL, USER_NAME
|
||||
from .models import TOSHistoricalData
|
||||
from .schemas.user import User
|
||||
@@ -16,6 +17,8 @@ from .schemas.user_profile import UserProfile
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
request = Request(config.tetris.proxy.tos or config.tetris.proxy.main)
|
||||
|
||||
|
||||
class Player:
|
||||
@overload
|
||||
@@ -56,36 +59,20 @@ class Player:
|
||||
async def get_info(self) -> UserInfoSuccess:
|
||||
"""获取用户信息"""
|
||||
if self._user_info is None:
|
||||
if self.teaid is not None:
|
||||
url = [
|
||||
splice_url(
|
||||
[
|
||||
i,
|
||||
'getTeaIdInfo',
|
||||
f'?{urlencode({"teaId":self.teaid})}',
|
||||
]
|
||||
)
|
||||
for i in BASE_URL
|
||||
]
|
||||
else:
|
||||
url = [
|
||||
splice_url(
|
||||
[
|
||||
i,
|
||||
'getUsernameInfo',
|
||||
f'?{urlencode({"username":self.user_name})}',
|
||||
]
|
||||
)
|
||||
for i in BASE_URL
|
||||
]
|
||||
raw_user_info = await Request.failover_request(url, failover_code=[502], failover_exc=(TimeoutException,))
|
||||
path = str(
|
||||
URL('getTeaIdInfo') % {'teaId': self.teaid}
|
||||
if self.teaid is not None
|
||||
else URL('getUsernameInfo') % {'username': cast(str, self.user_name)}
|
||||
)
|
||||
raw_user_info = await request.failover_request(
|
||||
[i / path for i in BASE_URL], failover_code=[502], failover_exc=(TimeoutException,)
|
||||
)
|
||||
user_info: UserInfo = type_validate_json(UserInfo, raw_user_info) # type: ignore[arg-type]
|
||||
if not isinstance(user_info, UserInfoSuccess):
|
||||
msg = f'用户信息请求错误:\n{user_info.error}'
|
||||
raise RequestError(msg)
|
||||
self._user_info = user_info
|
||||
await anti_duplicate_add(
|
||||
TOSHistoricalData,
|
||||
TOSHistoricalData(
|
||||
user_unique_identifier=(await self.user).unique_identifier,
|
||||
api_type='User Info',
|
||||
@@ -99,17 +86,11 @@ class Player:
|
||||
"""获取用户数据"""
|
||||
if other_parameter is None:
|
||||
other_parameter = {}
|
||||
params = urlencode(dict(sorted(other_parameter.items())))
|
||||
params = (URL('') % dict(sorted(other_parameter.items()))).human_repr()
|
||||
if self._user_profile.get(params) is None:
|
||||
raw_user_profile = await Request.failover_request(
|
||||
raw_user_profile = await request.failover_request(
|
||||
[
|
||||
splice_url(
|
||||
[
|
||||
i,
|
||||
'getProfile',
|
||||
f'?{urlencode({"id":self.teaid or self.user_name,**other_parameter})}',
|
||||
]
|
||||
)
|
||||
i / 'getProfile' % {'id': self.teaid or cast(str, self.user_name), **other_parameter}
|
||||
for i in BASE_URL
|
||||
],
|
||||
failover_code=[502],
|
||||
@@ -117,7 +98,6 @@ class Player:
|
||||
)
|
||||
self._user_profile[params] = type_validate_json(UserProfile, raw_user_profile)
|
||||
await anti_duplicate_add(
|
||||
TOSHistoricalData,
|
||||
TOSHistoricalData(
|
||||
user_unique_identifier=(await self.user).unique_identifier,
|
||||
api_type='User Profile',
|
||||
|
||||
@@ -6,7 +6,7 @@ from ....schemas import BaseUser
|
||||
from ...constant import GAME_TYPE
|
||||
|
||||
|
||||
class User(BaseUser):
|
||||
class User(BaseUser[Literal['TOS']]):
|
||||
platform: Literal['TOS'] = GAME_TYPE
|
||||
|
||||
teaid: str
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
from re import compile
|
||||
from typing import Literal
|
||||
|
||||
from yarl import URL
|
||||
|
||||
GAME_TYPE: Literal['TOS'] = 'TOS'
|
||||
|
||||
BASE_URL = {
|
||||
'https://teatube.cn:8888/',
|
||||
'http://cafuuchino1.studio26f.org:19970',
|
||||
URL('https://teatube.cn:8888/'),
|
||||
URL('http://cafuuchino1.studio26f.org:19970'),
|
||||
}
|
||||
|
||||
USER_NAME = compile(
|
||||
|
||||
@@ -19,6 +19,7 @@ from ...utils.host import HostPage, get_self_netloc
|
||||
from ...utils.image import get_avatar
|
||||
from ...utils.metrics import TetrisMetricsProWithLPMADPM, get_metrics
|
||||
from ...utils.render import render
|
||||
from ...utils.render.avatar import get_avatar as get_random_avatar
|
||||
from ...utils.render.schemas.base import People, Ranking
|
||||
from ...utils.render.schemas.tos_info import Info, Multiplayer, Radar
|
||||
from ...utils.screenshot import screenshot
|
||||
@@ -57,7 +58,9 @@ def add_special_handlers(
|
||||
user_info, game_data = await gather(player.get_info(), get_game_data(player))
|
||||
if game_data is not None:
|
||||
await UniMessage.image(
|
||||
raw=await make_query_image(user_info, game_data, event_user_info)
|
||||
raw=await make_query_image(
|
||||
user_info, game_data, None if isinstance(target, At) else event_user_info
|
||||
)
|
||||
).finish()
|
||||
await make_query_text(user_info, game_data).finish()
|
||||
except RequestError as e:
|
||||
@@ -126,13 +129,18 @@ async def _(
|
||||
user_info, game_data = await gather(player.get_info(), get_game_data(player))
|
||||
if game_data is not None:
|
||||
await (
|
||||
message + UniMessage.image(raw=await make_query_image(user_info, game_data, event_user_info))
|
||||
message
|
||||
+ UniMessage.image(
|
||||
raw=await make_query_image(
|
||||
user_info, game_data, None if isinstance(target, At) else event_user_info
|
||||
)
|
||||
)
|
||||
).finish()
|
||||
await (message + make_query_text(user_info, game_data)).finish()
|
||||
|
||||
|
||||
@alc.assign('TOS.query')
|
||||
async def _(account: Player, event_session: EventSession, event_user_info: UserInfo = EventUserInfo()): # noqa: B008
|
||||
async def _(account: Player, event_session: EventSession):
|
||||
async with trigger(
|
||||
session_persist_id=await get_session_persist_id(event_session),
|
||||
game_platform=GAME_TYPE,
|
||||
@@ -141,7 +149,7 @@ async def _(account: Player, event_session: EventSession, event_user_info: UserI
|
||||
):
|
||||
user_info, game_data = await gather(account.get_info(), get_game_data(account))
|
||||
if game_data is not None:
|
||||
await UniMessage.image(raw=await make_query_image(user_info, game_data, event_user_info)).finish()
|
||||
await UniMessage.image(raw=await make_query_image(user_info, game_data, None)).finish()
|
||||
await make_query_text(user_info, game_data).finish()
|
||||
|
||||
|
||||
@@ -184,7 +192,7 @@ async def get_game_data(player: Player, query_num: int = 50) -> GameData | None:
|
||||
break
|
||||
if num == 0:
|
||||
return None
|
||||
# TODO: 如果有效局数小于 {查询数} , 并且没有无dig信息的局, 且 user_profile.data 内有{请求数}个局, 则继续往前获取信息
|
||||
# TODO)) 如果有效局数小于 {查询数} , 并且没有无dig信息的局, 且 user_profile.data 内有{请求数}个局, 则继续往前获取信息
|
||||
metrics = get_metrics(
|
||||
lpm=weighted_total_lpm / total_time, apm=weighted_total_apm / total_time, adpm=weighted_total_adpm / total_time
|
||||
)
|
||||
@@ -197,15 +205,27 @@ async def get_game_data(player: Player, query_num: int = 50) -> GameData | None:
|
||||
)
|
||||
|
||||
|
||||
async def make_query_image(user_info: UserInfoSuccess, game_data: GameData, event_user_info: UserInfo) -> bytes:
|
||||
async def make_query_image(user_info: UserInfoSuccess, game_data: GameData, event_user_info: UserInfo | None) -> bytes:
|
||||
metrics = game_data.metrics
|
||||
duration = timedelta(milliseconds=float(user_info.data.pb_sprint)).total_seconds()
|
||||
sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004
|
||||
sprint_value = (
|
||||
(
|
||||
f'{duration:.3f}s'
|
||||
if (duration := timedelta(milliseconds=float(user_info.data.pb_sprint)).total_seconds()) < 60 # noqa: PLR2004
|
||||
else f'{duration // 60:.0f}m {duration % 60:.3f}s'
|
||||
)
|
||||
if user_info.data.pb_sprint != '2147483647'
|
||||
else 'N/A'
|
||||
)
|
||||
async with HostPage(
|
||||
await render(
|
||||
'v1/tos/info',
|
||||
Info(
|
||||
user=People(avatar=await get_avatar(event_user_info, 'Data URI', None), name=user_info.data.name),
|
||||
user=People(
|
||||
avatar=await get_avatar(event_user_info, 'Data URI', None)
|
||||
if event_user_info is not None
|
||||
else get_random_avatar(user_info.data.teaid),
|
||||
name=user_info.data.name,
|
||||
),
|
||||
ranking=Ranking(rating=float(user_info.data.ranking), rd=round(float(user_info.data.rd_now), 2)),
|
||||
multiplayer=Multiplayer(
|
||||
pps=metrics.pps,
|
||||
|
||||
@@ -1,16 +1,20 @@
|
||||
from functools import cache
|
||||
from hashlib import sha256
|
||||
from ipaddress import IPv4Address, IPv6Address
|
||||
from pathlib import Path as FilePath
|
||||
from typing import TYPE_CHECKING, ClassVar, Literal
|
||||
|
||||
from fastapi import FastAPI, Path, status
|
||||
from aiofiles import open
|
||||
from fastapi import BackgroundTasks, FastAPI, Path, status
|
||||
from fastapi.responses import FileResponse, HTMLResponse, Response
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from nonebot import get_app, get_driver
|
||||
from nonebot.log import logger
|
||||
from yarl import URL
|
||||
|
||||
from ..config.config import CACHE_PATH
|
||||
from ..games.tetrio.api.cache import request
|
||||
from .image import img_to_png
|
||||
from .request import Request
|
||||
from .templates import TEMPLATES_DIR
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -22,6 +26,7 @@ driver = get_driver()
|
||||
|
||||
global_config = driver.config
|
||||
|
||||
BASE_URL = URL('https://tetr.io/user-content/')
|
||||
|
||||
if not isinstance(app, FastAPI):
|
||||
msg = '本插件需要 FastAPI 驱动器才能运行'
|
||||
@@ -55,7 +60,7 @@ def _():
|
||||
|
||||
|
||||
@app.get('/host/{page_hash}.html', status_code=status.HTTP_200_OK)
|
||||
async def _(page_hash: str) -> HTMLResponse:
|
||||
def _(page_hash: str) -> HTMLResponse:
|
||||
if page_hash in HostPage.pages:
|
||||
return HTMLResponse(HostPage.pages[page_hash])
|
||||
return NOT_FOUND
|
||||
@@ -63,20 +68,30 @@ async def _(page_hash: str) -> HTMLResponse:
|
||||
|
||||
@app.get('/host/resource/tetrio/{resource_type}/{user_id}', status_code=status.HTTP_200_OK)
|
||||
async def _(
|
||||
resource_type: Literal['avatars', 'banners'], revision: int, user_id: str = Path(regex=r'^[a-f0-9]{24}$')
|
||||
resource_type: Literal['avatars', 'banners'],
|
||||
revision: int,
|
||||
background_tasks: BackgroundTasks,
|
||||
user_id: str = Path(regex=r'^[a-f0-9]{24}$'),
|
||||
) -> Response:
|
||||
if not (path := CACHE_PATH / 'tetrio' / resource_type / f'{user_id}_{revision}.png').exists():
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_bytes(
|
||||
img_to_png(
|
||||
await Request.request(
|
||||
f'https://tetr.io/user-content/{resource_type}/{user_id}.jpg?rv={revision}', is_json=False
|
||||
)
|
||||
image = img_to_png(
|
||||
await request.request(
|
||||
BASE_URL / resource_type / f'{user_id}.jpg' % {'rv': revision},
|
||||
is_json=False,
|
||||
)
|
||||
)
|
||||
background_tasks.add_task(write_cache, path=path, data=image)
|
||||
return Response(content=image, media_type='image/png')
|
||||
return FileResponse(path)
|
||||
|
||||
|
||||
async def write_cache(path: FilePath, data: bytes) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
async with open(path, 'wb') as file:
|
||||
await file.write(data)
|
||||
|
||||
|
||||
@cache
|
||||
def get_self_netloc() -> str:
|
||||
host: IPv4Address | IPv6Address | IPvAnyAddress = global_config.host
|
||||
if isinstance(host, IPv4Address):
|
||||
|
||||
34
nonebot_plugin_tetris_stats/utils/render/avatar/__init__.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from base64 import b64encode
|
||||
from io import BytesIO
|
||||
from random import Random
|
||||
|
||||
from PIL import Image
|
||||
from PIL.Image import Resampling
|
||||
|
||||
from .draw import PIECE_MEMBERS, SkinManager
|
||||
|
||||
|
||||
def get_avatar(send: float | str | bytes | bytearray | None = None) -> str:
|
||||
random = Random(send) # noqa: S311
|
||||
skin = (
|
||||
SkinManager.get_skin(send)
|
||||
.get_piece(random.choice(PIECE_MEMBERS))
|
||||
.rotate(
|
||||
random.randint(-360, 360),
|
||||
expand=True,
|
||||
resample=Resampling.BICUBIC,
|
||||
)
|
||||
)
|
||||
skin = skin.crop(skin.getbbox())
|
||||
background = Image.new('RGBA', (2048, 2048), '#e5e5e5')
|
||||
|
||||
skin_ratio = min(1536 / skin.width, 1536 / skin.height)
|
||||
|
||||
new_size = (int(skin.width * skin_ratio), int(skin.height * skin_ratio))
|
||||
skin = skin.resize(new_size, Resampling.BICUBIC)
|
||||
|
||||
background.paste(skin, ((background.width - skin.width) // 2, (background.height - skin.height) // 2), mask=skin)
|
||||
background = background.resize((512, 512), Resampling.LANCZOS)
|
||||
with BytesIO() as output:
|
||||
background.save(output, format='PNG')
|
||||
return f'data:image/png;base64,{b64encode(output.getvalue()).decode("utf-8")}'
|
||||
169
nonebot_plugin_tetris_stats/utils/render/avatar/draw/__init__.py
Normal file
@@ -0,0 +1,169 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from random import Random
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from PIL.Image import Image
|
||||
from typing_extensions import Self
|
||||
|
||||
|
||||
class Piece(Enum):
|
||||
Z = (
|
||||
(True, True, False),
|
||||
(False, True, True),
|
||||
)
|
||||
|
||||
S = (
|
||||
(False, True, True),
|
||||
(True, True, False),
|
||||
)
|
||||
|
||||
J = (
|
||||
(True, False, False),
|
||||
(True, True, True),
|
||||
)
|
||||
|
||||
L = (
|
||||
(False, False, True),
|
||||
(True, True, True),
|
||||
)
|
||||
|
||||
T = (
|
||||
(False, True, False),
|
||||
(True, True, True),
|
||||
)
|
||||
|
||||
I = ( # noqa: E741
|
||||
(True, True, True, True),
|
||||
)
|
||||
|
||||
O = ( # noqa: E741
|
||||
(True, True),
|
||||
(True, True),
|
||||
)
|
||||
|
||||
I5 = (
|
||||
(True, True, True, True, True), # fmt: skip
|
||||
)
|
||||
|
||||
V = (
|
||||
(True, False, False),
|
||||
(True, False, False),
|
||||
(True, True, True),
|
||||
)
|
||||
|
||||
T5 = (
|
||||
(True, True, True),
|
||||
(False, True, False),
|
||||
(False, True, False),
|
||||
)
|
||||
|
||||
U = (
|
||||
(True, False, True),
|
||||
(True, True, True),
|
||||
)
|
||||
|
||||
W = (
|
||||
(True, False, False),
|
||||
(True, True, False),
|
||||
(False, True, True),
|
||||
)
|
||||
|
||||
X = (
|
||||
(False, True, False),
|
||||
(True, True, True),
|
||||
(False, True, False),
|
||||
)
|
||||
|
||||
J5 = (
|
||||
(True, False, False, False),
|
||||
(True, True, True, True),
|
||||
)
|
||||
|
||||
L5 = (
|
||||
(False, False, False, True),
|
||||
(True, True, True, True),
|
||||
)
|
||||
|
||||
H = (
|
||||
(False, False, True, True),
|
||||
(True, True, True, False),
|
||||
)
|
||||
|
||||
N = (
|
||||
(True, True, False, False),
|
||||
(False, True, True, True),
|
||||
)
|
||||
|
||||
Y = (
|
||||
(False, True, False, False),
|
||||
(True, True, True, True),
|
||||
)
|
||||
|
||||
R = (
|
||||
(False, False, True, False),
|
||||
(True, True, True, True),
|
||||
)
|
||||
|
||||
P = (
|
||||
(True, True, False),
|
||||
(True, True, True),
|
||||
)
|
||||
|
||||
Q = (
|
||||
(False, True, True),
|
||||
(True, True, True),
|
||||
)
|
||||
|
||||
F = (
|
||||
(True, False, False),
|
||||
(True, True, True),
|
||||
(False, True, False),
|
||||
)
|
||||
|
||||
E = (
|
||||
(False, False, True),
|
||||
(True, True, True),
|
||||
(False, True, False),
|
||||
)
|
||||
|
||||
S5 = (
|
||||
(False, True, True),
|
||||
(False, True, False),
|
||||
(True, True, False),
|
||||
)
|
||||
|
||||
Z5 = (
|
||||
(True, True, False),
|
||||
(False, True, False),
|
||||
(False, True, True),
|
||||
)
|
||||
|
||||
|
||||
PIECE_MEMBERS = tuple(Piece)
|
||||
|
||||
|
||||
class SkinManager:
|
||||
skin: ClassVar[list['Skin']] = []
|
||||
|
||||
@classmethod
|
||||
def register(cls, skin: 'Skin') -> None:
|
||||
cls.skin.append(skin)
|
||||
|
||||
@classmethod
|
||||
def get_skin(cls, send: float | str | bytes | bytearray | None = None) -> 'Skin':
|
||||
return Random(send).choice(cls.skin) # noqa: S311
|
||||
|
||||
|
||||
class Skin(ABC):
|
||||
def __new__(cls, *args: Any, **kwargs: Any) -> Self: # noqa: ANN401, ARG003
|
||||
instance = super().__new__(cls)
|
||||
SkinManager.register(instance)
|
||||
return instance
|
||||
|
||||
@abstractmethod
|
||||
def get_piece(self, piece: Piece) -> Image:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
from . import tech # noqa: E402, F401
|
||||
@@ -0,0 +1,94 @@
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
|
||||
from nonebot import get_driver
|
||||
from PIL import Image
|
||||
from PIL.Image import Resampling
|
||||
from typing_extensions import override
|
||||
|
||||
from .. import Piece, Skin
|
||||
|
||||
SINGLE = 30
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
class Block(Enum):
|
||||
Z = (0, 0, 30, 30)
|
||||
Y = (30, 0, 60, 30)
|
||||
L = (60, 0, 90, 30)
|
||||
O = (90, 0, 120, 30) # noqa: E741
|
||||
U = (120, 0, 150, 30)
|
||||
Q = (150, 0, 180, 30)
|
||||
S = (180, 0, 210, 30)
|
||||
H = (210, 0, 240, 30)
|
||||
I = (0, 30, 30, 60) # noqa: E741
|
||||
F = (30, 30, 60, 60)
|
||||
J = (60, 30, 90, 60)
|
||||
R = (90, 30, 120, 60)
|
||||
C = (120, 30, 150, 60)
|
||||
T = (150, 30, 180, 60)
|
||||
W = (180, 30, 210, 60)
|
||||
N = (210, 30, 240, 60)
|
||||
|
||||
|
||||
piece_block_mapping = {
|
||||
Piece.Z: Block.Z,
|
||||
Piece.S: Block.S,
|
||||
Piece.J: Block.J,
|
||||
Piece.L: Block.L,
|
||||
Piece.T: Block.T,
|
||||
Piece.I: Block.I,
|
||||
Piece.O: Block.O,
|
||||
Piece.I5: Block.O,
|
||||
Piece.V: Block.I,
|
||||
Piece.T5: Block.C,
|
||||
Piece.U: Block.U,
|
||||
Piece.W: Block.W,
|
||||
Piece.X: Block.O,
|
||||
Piece.J5: Block.J,
|
||||
Piece.L5: Block.L,
|
||||
Piece.H: Block.H,
|
||||
Piece.N: Block.N,
|
||||
Piece.R: Block.R,
|
||||
Piece.Y: Block.Y,
|
||||
Piece.P: Block.Y,
|
||||
Piece.Q: Block.Q,
|
||||
Piece.F: Block.F,
|
||||
Piece.E: Block.Y,
|
||||
Piece.S5: Block.S,
|
||||
Piece.Z5: Block.Z,
|
||||
}
|
||||
|
||||
|
||||
class TechSkin(Skin):
|
||||
def __init__(self, path: Path, name: str | None = None) -> None:
|
||||
self.path = path
|
||||
self.name = name or path.name
|
||||
self.image = Image.open(path)
|
||||
self._block_cache: dict[Block, Image.Image] = {}
|
||||
|
||||
def get_block(self, block: Block) -> Image.Image:
|
||||
return self._block_cache.setdefault(block, self.image.crop(block.value))
|
||||
|
||||
def draw_piece(self, block: Block, piece: Piece, scale: int = 10) -> Image.Image:
|
||||
canvas = Image.new(
|
||||
'RGBA', (len(piece.value[0]) * SINGLE * scale, len(piece.value) * SINGLE * scale), (0, 0, 0, 0)
|
||||
)
|
||||
block_img = self.get_block(block).resize((SINGLE * scale, SINGLE * scale), resample=Resampling.BICUBIC)
|
||||
for i, row in enumerate(piece.value):
|
||||
for j, mino in enumerate(row):
|
||||
if mino:
|
||||
canvas.paste(block_img, (j * SINGLE * scale, i * SINGLE * scale))
|
||||
return canvas
|
||||
|
||||
@override
|
||||
def get_piece(self, piece: Piece) -> Image.Image:
|
||||
return self.draw_piece(piece_block_mapping[piece], piece)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
def _():
|
||||
path = Path(__file__).parent / 'skins'
|
||||
for i in sorted(path.iterdir()):
|
||||
TechSkin(i)
|
||||
|
After Width: | Height: | Size: 4.3 KiB |
|
After Width: | Height: | Size: 17 KiB |
|
After Width: | Height: | Size: 2.0 KiB |
|
After Width: | Height: | Size: 4.6 KiB |
|
After Width: | Height: | Size: 8.0 KiB |
|
After Width: | Height: | Size: 8.8 KiB |
|
After Width: | Height: | Size: 5.0 KiB |
|
After Width: | Height: | Size: 2.6 KiB |
|
After Width: | Height: | Size: 12 KiB |
|
After Width: | Height: | Size: 18 KiB |
|
After Width: | Height: | Size: 1.6 KiB |
|
After Width: | Height: | Size: 13 KiB |
|
After Width: | Height: | Size: 2.1 KiB |
|
After Width: | Height: | Size: 12 KiB |
|
After Width: | Height: | Size: 12 KiB |
|
After Width: | Height: | Size: 4.1 KiB |
|
After Width: | Height: | Size: 9.4 KiB |
|
After Width: | Height: | Size: 4.4 KiB |
|
After Width: | Height: | Size: 3.4 KiB |
|
After Width: | Height: | Size: 41 KiB |
|
After Width: | Height: | Size: 4.9 KiB |
|
After Width: | Height: | Size: 1.0 KiB |
|
After Width: | Height: | Size: 2.3 KiB |
|
After Width: | Height: | Size: 1.0 KiB |
|
After Width: | Height: | Size: 613 B |
|
After Width: | Height: | Size: 2.9 KiB |
|
After Width: | Height: | Size: 32 KiB |
|
After Width: | Height: | Size: 837 B |
|
After Width: | Height: | Size: 2.5 KiB |
|
After Width: | Height: | Size: 8.5 KiB |
|
After Width: | Height: | Size: 9.9 KiB |
|
After Width: | Height: | Size: 3.0 KiB |
@@ -70,6 +70,4 @@ class Record(BaseModel):
|
||||
rank: int | None
|
||||
personal_rank: int | None
|
||||
|
||||
statistic: Statistic
|
||||
|
||||
play_at: datetime
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from .base import Record as BaseRecord
|
||||
from .base import Statistic
|
||||
|
||||
|
||||
class Record(BaseRecord):
|
||||
statistic: Statistic
|
||||
time: str
|
||||
|
||||
@@ -3,8 +3,7 @@ from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from ......games.tetrio.api.schemas.user_records import Zen
|
||||
from ......games.tetrio.api.typing import Rank, ValidRank
|
||||
from ......games.tetrio.api.typing import Rank
|
||||
from .....typing import Number
|
||||
from ...base import Avatar
|
||||
from .base import TetraLeagueHistoryData
|
||||
@@ -54,7 +53,7 @@ class TetraLeagueStatistic(BaseModel):
|
||||
|
||||
class TetraLeague(BaseModel):
|
||||
rank: Rank
|
||||
highest_rank: ValidRank
|
||||
highest_rank: Rank
|
||||
|
||||
tr: Number
|
||||
|
||||
@@ -91,6 +90,11 @@ class Blitz(BaseModel):
|
||||
play_at: datetime
|
||||
|
||||
|
||||
class Zen(BaseModel):
|
||||
level: int
|
||||
score: int
|
||||
|
||||
|
||||
class Info(BaseModel):
|
||||
user: User
|
||||
tetra_league: TetraLeague | None
|
||||
|
||||
@@ -1,54 +1,79 @@
|
||||
from collections.abc import Sequence
|
||||
from http import HTTPStatus
|
||||
from urllib.parse import urljoin, urlparse
|
||||
from typing import Any
|
||||
|
||||
from aiofiles import open
|
||||
from httpx import AsyncClient, HTTPError
|
||||
from nonebot import get_driver, get_plugin_config
|
||||
from msgspec import DecodeError, Struct, json
|
||||
from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
from playwright.async_api import Response
|
||||
from ujson import JSONDecodeError, dumps, loads
|
||||
from yarl import URL
|
||||
|
||||
from ..config.config import CACHE_PATH, Config
|
||||
from ..config.config import CACHE_PATH, config
|
||||
from .browser import BrowserManager
|
||||
from .exception import RequestError
|
||||
|
||||
driver = get_driver()
|
||||
config = get_plugin_config(Config)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await Request.init_cache()
|
||||
await Request.read_cache()
|
||||
class CloudflareCache(Struct):
|
||||
headers: dict[str, Any] | None = None
|
||||
cookies: dict[str, Any] | None = None
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await Request.write_cache()
|
||||
encoder = json.Encoder()
|
||||
decoder = json.Decoder()
|
||||
|
||||
|
||||
def splice_url(url_list: list[str]) -> str:
|
||||
url = ''
|
||||
if len(url_list):
|
||||
url = url_list.pop(0)
|
||||
for i in url_list:
|
||||
url = urljoin(url, i)
|
||||
return url
|
||||
class AntiCloudflare:
|
||||
cache_decoder = json.Decoder(type=CloudflareCache)
|
||||
|
||||
def __init__(self, domain_suffix: str) -> None:
|
||||
self.domain_suffix = domain_suffix
|
||||
self.cache_path = CACHE_PATH / f'{self.domain_suffix}_cloudflare_cache.json'
|
||||
self._headers: dict | None = None
|
||||
self._cookies: dict | None = None
|
||||
self.read_cache()
|
||||
|
||||
class Request:
|
||||
"""网络请求相关类"""
|
||||
def read_cache(self) -> None:
|
||||
"""读取缓存文件"""
|
||||
try:
|
||||
cache: CloudflareCache = self.cache_decoder.decode(self.cache_path.read_text(encoding='UTF-8'))
|
||||
self._headers = cache.headers
|
||||
self._cookies = cache.cookies
|
||||
except (OSError, DecodeError):
|
||||
self.cache_path.unlink()
|
||||
self.write_cache()
|
||||
|
||||
_CACHE_FILE = CACHE_PATH / 'cloudflare_cache.json'
|
||||
_headers: dict | None = None
|
||||
_cookies: dict | None = None
|
||||
def write_cache(self) -> None:
|
||||
"""写入缓存文件"""
|
||||
self.cache_path.write_bytes(json.encode(CloudflareCache(headers=self.headers, cookies=self.cookies)))
|
||||
|
||||
@classmethod
|
||||
async def _anti_cloudflare(cls, url: str) -> bytes:
|
||||
@property
|
||||
def headers(self) -> dict | None:
|
||||
return self._headers
|
||||
|
||||
@headers.setter
|
||||
def headers(self, value: dict | None) -> None:
|
||||
self._headers = value
|
||||
self.write_cache()
|
||||
|
||||
@property
|
||||
def cookies(self) -> dict | None:
|
||||
return self._cookies
|
||||
|
||||
@cookies.setter
|
||||
def cookies(self, value: dict | None) -> None:
|
||||
self._cookies = value
|
||||
self.write_cache()
|
||||
|
||||
async def __call__(self, url: str, proxy: str | None = None) -> bytes:
|
||||
"""用firefox硬穿五秒盾"""
|
||||
browser = await BrowserManager.get_browser()
|
||||
async with await browser.new_context() as context, await context.new_page() as page:
|
||||
async with (
|
||||
await browser.new_context(proxy={'server': proxy} if proxy is not None else None) as context,
|
||||
await context.new_page() as page,
|
||||
):
|
||||
response = await page.goto(url)
|
||||
attempts = 0
|
||||
while attempts < 60: # noqa: PLR2004
|
||||
@@ -61,84 +86,68 @@ class Request:
|
||||
logger.warning('疑似触发了 Cloudflare 的验证码')
|
||||
break
|
||||
try:
|
||||
loads(text)
|
||||
except JSONDecodeError:
|
||||
decoder.decode(text)
|
||||
except DecodeError:
|
||||
await page.wait_for_timeout(1000)
|
||||
else:
|
||||
if not isinstance(response, Response):
|
||||
msg = 'api请求失败'
|
||||
raise RequestError(msg)
|
||||
cls._headers = await response.request.all_headers()
|
||||
self.headers = await response.request.all_headers()
|
||||
try:
|
||||
cls._cookies = {
|
||||
self.cookies = {
|
||||
name: value
|
||||
for i in await context.cookies()
|
||||
if (name := i.get('name')) is not None and (value := i.get('value')) is not None
|
||||
}
|
||||
except KeyError:
|
||||
cls._cookies = None
|
||||
self.cookies = None
|
||||
return await response.body()
|
||||
msg = '绕过五秒盾失败'
|
||||
raise RequestError(msg)
|
||||
|
||||
@classmethod
|
||||
async def init_cache(cls) -> None:
|
||||
"""初始化缓存文件"""
|
||||
if not cls._CACHE_FILE.exists():
|
||||
async with open(file=cls._CACHE_FILE, mode='w', encoding='UTF-8') as file:
|
||||
await file.write(dumps({'headers': cls._headers, 'cookies': cls._cookies}))
|
||||
|
||||
@classmethod
|
||||
async def read_cache(cls) -> None:
|
||||
"""读取缓存文件"""
|
||||
try:
|
||||
async with open(file=cls._CACHE_FILE, mode='r', encoding='UTF-8') as file:
|
||||
json = loads(await file.read())
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except (PermissionError, JSONDecodeError):
|
||||
cls._CACHE_FILE.unlink()
|
||||
await cls.init_cache()
|
||||
else:
|
||||
cls._headers = json['headers']
|
||||
cls._cookies = json['cookies']
|
||||
class Request:
|
||||
"""网络请求相关类"""
|
||||
|
||||
@classmethod
|
||||
async def write_cache(cls) -> None:
|
||||
"""写入缓存文件"""
|
||||
try:
|
||||
async with open(file=cls._CACHE_FILE, mode='r+', encoding='UTF-8') as file:
|
||||
await file.write(dumps({'headers': cls._headers, 'cookies': cls._cookies}))
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except (PermissionError, JSONDecodeError):
|
||||
cls._CACHE_FILE.unlink()
|
||||
await cls.init_cache()
|
||||
def __init__(self, proxy: str | None) -> None:
|
||||
self.proxy = proxy
|
||||
self.anti_cloudflares: dict[str, AntiCloudflare] = {}
|
||||
|
||||
@classmethod
|
||||
async def request(cls, url: str, *, is_json: bool = True) -> bytes:
|
||||
async def request(
|
||||
self,
|
||||
url: URL,
|
||||
*,
|
||||
is_json: bool = True,
|
||||
enable_anti_cloudflare: bool = False,
|
||||
) -> bytes:
|
||||
"""请求api"""
|
||||
if (anti_cloudflare := self.anti_cloudflares.get(url.host or '')) is not None:
|
||||
cookies = anti_cloudflare.cookies
|
||||
headers = anti_cloudflare.headers
|
||||
else:
|
||||
cookies = None
|
||||
headers = None
|
||||
try:
|
||||
async with AsyncClient(cookies=cls._cookies, timeout=config.tetris_req_timeout) as session:
|
||||
response = await session.get(url, headers=cls._headers)
|
||||
async with AsyncClient(cookies=cookies, timeout=config.tetris.request_timeout, proxy=self.proxy) as session:
|
||||
response = await session.get(str(url), headers=headers)
|
||||
if response.status_code != HTTPStatus.OK:
|
||||
msg = f'请求错误 code: {response.status_code} {HTTPStatus(response.status_code).phrase}\n{response.text}'
|
||||
raise RequestError(msg, status_code=response.status_code)
|
||||
if is_json:
|
||||
loads(response.content)
|
||||
decoder.decode(response.content)
|
||||
return response.content
|
||||
except HTTPError as e:
|
||||
msg = f'请求错误 \n{e!r}'
|
||||
raise RequestError(msg) from e
|
||||
except JSONDecodeError:
|
||||
if urlparse(url).netloc.lower().endswith('tetr.io'):
|
||||
return await cls._anti_cloudflare(url)
|
||||
except DecodeError: # 由于捕获的是 DecodeError 所以一定是 is_json = True
|
||||
if enable_anti_cloudflare and url.host is not None:
|
||||
return await self.anti_cloudflares.setdefault(url.host, AntiCloudflare(url.host))(str(url), self.proxy)
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
async def failover_request(
|
||||
cls,
|
||||
urls: Sequence[str],
|
||||
self,
|
||||
urls: Sequence[URL],
|
||||
*,
|
||||
failover_code: Sequence[int],
|
||||
failover_exc: tuple[type[BaseException], ...],
|
||||
@@ -148,7 +157,7 @@ class Request:
|
||||
for i in urls:
|
||||
logger.debug(f'尝试请求 {i}')
|
||||
try:
|
||||
return await cls.request(i, is_json=is_json)
|
||||
return await self.request(i, is_json=is_json)
|
||||
except RequestError as e:
|
||||
if e.status_code in failover_code: # 如果状态码在 failover_code 中, 则继续尝试下一个URL
|
||||
error_list.append(e)
|
||||
|
||||
@@ -1,23 +1,19 @@
|
||||
from nonebot import get_plugin_config
|
||||
from playwright.async_api import TimeoutError, ViewportSize
|
||||
|
||||
from ..config.config import Config
|
||||
from ..config.config import config
|
||||
from .browser import BrowserManager
|
||||
from .retry import retry
|
||||
from .time_it import time_it
|
||||
|
||||
config = get_plugin_config(Config)
|
||||
|
||||
|
||||
@retry(exception_type=TimeoutError, reply='截图失败, 重试中')
|
||||
@time_it
|
||||
async def screenshot(url: str) -> bytes:
|
||||
browser = await BrowserManager.get_browser()
|
||||
async with (
|
||||
await browser.new_page(device_scale_factor=config.tetris_screenshot_quality) as page,
|
||||
await browser.new_page(device_scale_factor=config.tetris.screenshot_quality) as page,
|
||||
):
|
||||
await page.goto(url)
|
||||
await page.wait_for_load_state('networkidle')
|
||||
size: ViewportSize = await page.evaluate("""
|
||||
() => {
|
||||
const element = document.querySelector('#content');
|
||||
@@ -28,4 +24,5 @@ async def screenshot(url: str) -> bytes:
|
||||
};
|
||||
""")
|
||||
await page.set_viewport_size(size)
|
||||
return await page.locator('id=content').screenshot(timeout=5000, type='png')
|
||||
await page.wait_for_load_state('networkidle')
|
||||
return await page.locator('id=content').screenshot(animations='disabled', timeout=5000, type='png')
|
||||
|
||||
@@ -11,19 +11,20 @@ from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
from nonebot.permission import SUPERUSER
|
||||
from nonebot_plugin_alconna import Alconna, Args, Option, on_alconna
|
||||
from nonebot_plugin_localstore import get_cache_file, get_data_dir
|
||||
from rich.progress import Progress
|
||||
|
||||
from ..config.config import CACHE_PATH, DATA_PATH, config
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
TEMPLATES_DIR = get_data_dir('nonebot_plugin_tetris_stats') / 'templates'
|
||||
TEMPLATES_DIR = DATA_PATH / 'templates'
|
||||
|
||||
alc = on_alconna(Alconna('更新模板', Option('--revision', Args['revision', str], alias={'-R'})), permission=SUPERUSER)
|
||||
|
||||
|
||||
async def download_templates(tag: str) -> Path:
|
||||
logger.info(f'开始下载模板 {tag}')
|
||||
async with AsyncClient() as client:
|
||||
async with AsyncClient(proxy=config.tetris.proxy.github or config.tetris.proxy.main) as client:
|
||||
if tag == 'latest':
|
||||
logger.info('目标为 latest, 正在获取最新版本号')
|
||||
tag = (
|
||||
@@ -36,7 +37,7 @@ async def download_templates(tag: str) -> Path:
|
||||
.rsplit('/', 1)[-1]
|
||||
)
|
||||
logger.success(f'获取到的最新版本号: {tag}')
|
||||
path = get_cache_file('nonebot_plugin_tetris_stats', f'dist_{time_ns()}.zip')
|
||||
path = CACHE_PATH / f'dist_{time_ns()}.zip'
|
||||
with Progress() as progress:
|
||||
task_id = progress.add_task('[red]Downloading...', total=None)
|
||||
async with (
|
||||
@@ -56,7 +57,7 @@ async def download_templates(tag: str) -> Path:
|
||||
return path
|
||||
|
||||
|
||||
async def unzip_templates(zip_path: Path) -> Path:
|
||||
def unzip_templates(zip_path: Path) -> Path:
|
||||
logger.info('开始解压模板')
|
||||
temp_path = TEMPLATES_DIR.parent / f'temp_{time_ns()}'
|
||||
with ZipFile(zip_path) as zip_file:
|
||||
@@ -91,7 +92,7 @@ async def check_hash(hash_file_path: Path) -> bool:
|
||||
|
||||
async def init_templates(tag: str) -> bool:
|
||||
logger.info(f'开始初始化模板 {tag}')
|
||||
temp_path = await unzip_templates(await download_templates(tag))
|
||||
temp_path = unzip_templates(await download_templates(tag))
|
||||
if not await check_hash(temp_path / 'hash.sha256'):
|
||||
rmtree(temp_path)
|
||||
return False
|
||||
@@ -104,7 +105,7 @@ async def init_templates(tag: str) -> bool:
|
||||
|
||||
|
||||
async def check_tag(tag: str) -> bool:
|
||||
async with AsyncClient() as client:
|
||||
async with AsyncClient(proxy=config.tetris.proxy.github or config.tetris.proxy.main) as client:
|
||||
return (
|
||||
await client.get(f'https://github.com/A-Minos/tetris-stats-templates/releases/tag/{tag}')
|
||||
).status_code != HTTPStatus.NOT_FOUND
|
||||
|
||||
854
poetry.lock
generated
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = 'nonebot-plugin-tetris-stats'
|
||||
version = '1.4.5'
|
||||
version = '1.4.17'
|
||||
description = '一款基于 NoneBot2 的用于查询 Tetris 相关游戏数据的插件'
|
||||
authors = ['scdhh <wallfjjd@gmail.com>']
|
||||
readme = 'README.md'
|
||||
@@ -10,48 +10,49 @@ license = 'AGPL-3.0'
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = '^3.10'
|
||||
nonebot2 = { extras = ["fastapi"], version = "^2.3.0" }
|
||||
nonebot-plugin-alconna = ">=0.48.0"
|
||||
nonebot-plugin-apscheduler = ">=0.4,<0.6"
|
||||
nonebot-plugin-localstore = ">=0.6,<0.8"
|
||||
nonebot-plugin-orm = ">=0.1.1,<0.8.0"
|
||||
nonebot-plugin-session = "^0.3.1"
|
||||
nonebot-plugin-session-orm = "^0.2.0"
|
||||
nonebot-plugin-user = ">=0.2,<0.4"
|
||||
nonebot-plugin-userinfo = "^0.2.4"
|
||||
aiocache = "^0.12.2"
|
||||
aiofiles = ">=23.2.1,<25.0.0"
|
||||
async-lru = "^2.0.4"
|
||||
httpx = "^0.27.0"
|
||||
jinja2 = "^3.1.3"
|
||||
nonebot2 = { extras = ['fastapi'], version = '^2.3.0' }
|
||||
nonebot-plugin-alconna = '>=0.48.0'
|
||||
nonebot-plugin-apscheduler = '>=0.4,<0.6'
|
||||
nonebot-plugin-localstore = '>=0.6,<0.8'
|
||||
nonebot-plugin-orm = '>=0.1.1,<0.8.0'
|
||||
nonebot-plugin-session = '^0.3.1'
|
||||
nonebot-plugin-session-orm = '^0.2.0'
|
||||
nonebot-plugin-user = '>=0.2,<0.5'
|
||||
nonebot-plugin-userinfo = '^0.2.4'
|
||||
aiocache = '^0.12.2'
|
||||
aiofiles = '>=23.2.1,<25.0.0'
|
||||
async-lru = '^2.0.4'
|
||||
httpx = '^0.27.0'
|
||||
jinja2 = '^3.1.3'
|
||||
lxml = '^5.1.0'
|
||||
msgspec = "^0.18.6"
|
||||
pandas = '>=1.4.3,<3.0.0'
|
||||
pillow = "^10.3.0"
|
||||
pillow = '^10.3.0'
|
||||
playwright = '^1.41.2'
|
||||
rich = "^13.7.1"
|
||||
ujson = '^5.9.0'
|
||||
zstandard = ">=0.22,<0.24"
|
||||
rich = '^13.7.1'
|
||||
yarl = "^1.9.4"
|
||||
zstandard = '>=0.22,<0.24'
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
mypy = '>=1.9'
|
||||
ruff = '>=0.3.0'
|
||||
types-aiofiles = ">=23.2.0.20240106,<25.0.0.0"
|
||||
types-lxml = "^2024.2.9"
|
||||
types-pillow = "^10.2.0.20240423"
|
||||
types-aiofiles = '>=23.2.0.20240106,<25.0.0.0'
|
||||
types-lxml = '^2024.2.9'
|
||||
types-pillow = '^10.2.0.20240423'
|
||||
types-ujson = '^5.9.0'
|
||||
pandas-stubs = '>=1.5.2,<3.0.0'
|
||||
nonebot2 = { extras = ["all"], version = "^2.3.0" }
|
||||
nonebot-adapter-discord = "^0.1.3"
|
||||
nonebot-adapter-kaiheila = "^0.3.4"
|
||||
nonebot-adapter-onebot = "^2.4.1"
|
||||
nonebot-adapter-qq = "^1.4.4"
|
||||
nonebot-adapter-satori = ">=0.11.4,<0.13.0"
|
||||
nonebot-plugin-orm = { extras = ["default"], version = ">=0.3,<0.8" }
|
||||
nonebot2 = { extras = ['all'], version = '^2.3.0' }
|
||||
nonebot-adapter-discord = '^0.1.3'
|
||||
nonebot-adapter-kaiheila = '^0.3.4'
|
||||
nonebot-adapter-onebot = '^2.4.1'
|
||||
nonebot-adapter-qq = '^1.4.4'
|
||||
nonebot-adapter-satori = '>=0.11.4,<0.13.0'
|
||||
nonebot-plugin-orm = { extras = ['default'], version = '>=0.3,<0.8' }
|
||||
|
||||
[tool.poetry.group.debug.dependencies]
|
||||
memory-profiler = "^0.61.0"
|
||||
memory-profiler = '^0.61.0'
|
||||
objprint = '^0.2.2'
|
||||
viztracer = "^0.16.2"
|
||||
viztracer = '^0.16.2'
|
||||
|
||||
[build-system]
|
||||
requires = ['poetry-core>=1.0.0']
|
||||
@@ -59,7 +60,7 @@ build-backend = 'poetry.core.masonry.api'
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 120
|
||||
target-version = "py310"
|
||||
target-version = 'py310'
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = [
|
||||
@@ -88,10 +89,12 @@ select = [
|
||||
'ICN', # flake8-import-conventions
|
||||
'PIE', # flake8-pie
|
||||
'T20', # flake8-print
|
||||
'PYI', # flake8-pyi
|
||||
'Q', # flake8-quotes
|
||||
'RSE', # flake8-raise
|
||||
'RET', # flake8-return
|
||||
'SLF', # flake8-self
|
||||
'SLOT', # flake8-slots
|
||||
'SIM', # flake8-simplify
|
||||
'TID', # flake8-tidy-imports
|
||||
'TCH', # flake8-type-checking
|
||||
@@ -103,6 +106,7 @@ select = [
|
||||
'PL', # pylint
|
||||
'TRY', # tryceratops
|
||||
'FLY', # flynt
|
||||
'FAST', # FastAPI
|
||||
'PERF', # Perflint
|
||||
'FURB', # refurb
|
||||
'RUF', # Ruff-specific rules
|
||||
@@ -123,7 +127,7 @@ flake8-quotes = { inline-quotes = 'single', multiline-quotes = 'double' }
|
||||
mypy-init-return = true
|
||||
|
||||
[tool.ruff.lint.flake8-builtins]
|
||||
builtins-ignorelist = ["id"]
|
||||
builtins-ignorelist = ['id']
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = 'single'
|
||||
|
||||