适配新 records API

This commit is contained in:
2024-08-09 16:23:12 +08:00
parent 429f99f77e
commit 02e703ea91
6 changed files with 84 additions and 17 deletions

View File

@@ -1,5 +1,6 @@
from enum import Enum
from types import MappingProxyType
from typing import Literal, overload
from typing import Literal, NamedTuple, cast, overload
from async_lru import alru_cache
from nonebot.compat import type_validate_json
@@ -11,24 +12,47 @@ from ..constant import BASE_URL, USER_ID, USER_NAME
from .cache import Cache
from .models import TETRIOHistoricalData
from .schemas.base import FailedModel
from .schemas.records.solo import Solo as SoloRecord
from .schemas.records.solo import SoloSuccessModel as RecordsSoloSuccessModel
from .schemas.summaries import (
AchievementsSuccessModel,
SoloSuccessModel,
SummariesModel,
ZenithSuccessModel,
ZenSuccessModel,
)
from .schemas.summaries import (
SoloSuccessModel as SummariesSoloSuccessModel,
)
from .schemas.summaries.base import User as SummariesUser
from .schemas.user import User
from .schemas.user_info import UserInfo, UserInfoSuccess
from .typing import Summaries
from .typing import Records, Summaries
class RecordModeType(str, Enum):
Sprint = '40l'
Blitz = 'blitz'
class RecordType(str, Enum):
Top = 'top'
Recent = 'recent'
Progression = 'progression'
class RecordKey(NamedTuple):
mode_type: RecordModeType
record_type: RecordType
def to_records(self) -> Records:
return cast(Records, f'{self.mode_type.value}_{self.record_type.value}')
class Player:
__SUMMARIES_MAPPING: MappingProxyType[Summaries, type[SummariesModel]] = MappingProxyType(
{
'40l': SoloSuccessModel,
'blitz': SoloSuccessModel,
'40l': SummariesSoloSuccessModel,
'blitz': SummariesSoloSuccessModel,
'zenith': ZenithSuccessModel,
'zenithex': ZenithSuccessModel,
'zen': ZenSuccessModel,
@@ -58,6 +82,7 @@ class Player:
self.__user: User | None = None
self._user_info: UserInfoSuccess | None = None
self._summaries: dict[Summaries, SummariesModel] = {}
self._records: dict[RecordKey, RecordsSoloSuccessModel] = {}
@property
def _request_user_parameter(self) -> str:
@@ -108,7 +133,7 @@ class Player:
return self._user_info
@overload
async def get_summaries(self, summaries_type: Literal['40l', 'blitz']) -> SoloSuccessModel: ...
async def get_summaries(self, summaries_type: Literal['40l', 'blitz']) -> SummariesSoloSuccessModel: ...
@overload
async def get_summaries(self, summaries_type: Literal['zenith', 'zenithex']) -> ZenithSuccessModel: ...
@overload
@@ -142,12 +167,12 @@ class Player:
@property
@alru_cache
async def sprint(self) -> SoloSuccessModel:
async def sprint(self) -> SummariesSoloSuccessModel:
return await self.get_summaries('40l')
@property
@alru_cache
async def blitz(self) -> SoloSuccessModel:
async def blitz(self) -> SummariesSoloSuccessModel:
return await self.get_summaries('blitz')
@property
@@ -185,3 +210,33 @@ class Player:
if (user := (await self._get_local_summaries_user())) is not None:
return user.banner_revision
return (await self.get_info()).data.banner_revision
async def get_records(self, mode_type: RecordModeType, records_type: RecordType) -> RecordsSoloSuccessModel:
if (record_key := RecordKey(mode_type, records_type)) not in self._records:
raw_records = await Cache.get(
splice_url(
[
BASE_URL,
'users/',
f'{self._request_user_parameter}/',
'records/',
f'{mode_type}/',
records_type,
]
)
)
records: RecordsSoloSuccessModel | FailedModel = type_validate_json(SoloRecord, raw_records) # type: ignore[arg-type]
if isinstance(records, FailedModel):
msg = f'用户Summaries数据请求错误:\n{records.error}'
raise RequestError(msg)
self._records[record_key] = records
await anti_duplicate_add(
TETRIOHistoricalData,
TETRIOHistoricalData(
user_unique_identifier=(await self.user).unique_identifier,
api_type=record_key.to_records(),
data=records,
update_time=records.cache.cached_at,
),
)
return self._records[record_key]