From 8ba3f3c3f44e0bcf65b902a4d881fcc9007d7ad7 Mon Sep 17 00:00:00 2001 From: shoucandanghehe Date: Fri, 10 May 2024 11:13:39 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=A8=20=E6=8B=86=E5=88=86=E5=87=BD?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../io_data_processor/processor.py | 383 +++++++++--------- 1 file changed, 193 insertions(+), 190 deletions(-) diff --git a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py index 62a0fc3..1695a70 100644 --- a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py +++ b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py @@ -6,7 +6,7 @@ from hashlib import md5, sha512 from math import ceil, floor from re import match from statistics import mean -from typing import Literal, NamedTuple +from typing import Literal from urllib.parse import urlunparse from zoneinfo import ZoneInfo @@ -43,7 +43,7 @@ from .schemas.response import ProcessedData, RawResponse from .schemas.user import User from .schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague, UserInfo from .schemas.user_info import SuccessModel as InfoSuccess -from .schemas.user_records import SoloRecord, UserRecords +from .schemas.user_records import MultiRecord, SoloRecord, UserRecords from .schemas.user_records import SuccessModel as RecordsSuccess from .typing import Rank @@ -60,6 +60,55 @@ def identify_user_info(info: str) -> User | MessageFormatError: return MessageFormatError('用户名/ID不合法') +def get_value_bounds(values: list[int | float]) -> tuple[int, int]: + value_max = 10 * ceil(max(values) / 10) + value_min = 10 * floor(min(values) / 10) + return value_max, value_min + + +def get_split(value_max: int, value_min: int) -> tuple[int, int]: + offset = 0 + overflow = 0 + + while True: + if (new_max_value := value_max + offset + overflow) > TR_MAX: + overflow -= 1 + continue + if (new_min_value := value_min - offset + overflow) < TR_MIN: + overflow += 1 + continue + if ((new_max_value - new_min_value) / 40).is_integer(): + split_value = int((value_max + offset - (value_min - offset)) / 4) + break + offset += 1 + return split_value, offset + overflow + + +def get_specified_point( + previous_point: TETRIOInfo.TetraLeagueHistory.Data, + behind_point: TETRIOInfo.TetraLeagueHistory.Data, + point_time: datetime, +) -> TETRIOInfo.TetraLeagueHistory.Data: + """根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据 + + Args: + previous_point (TETRIOInfo.TetraLeagueHistory.Data): 前面的数据点 + behind_point (TETRIOInfo.TetraLeagueHistory.Data): 后面的数据点 + point_time (datetime): 要推算的点的位置 + + Returns: + TETRIOInfo.TetraLeagueHistory.Data: 要推算的点的数据 + """ + # 求两个点的斜率 + slope = (behind_point.tr - previous_point.tr) / ( + datetime.timestamp(behind_point.record_at) - datetime.timestamp(previous_point.record_at) + ) + return TETRIOInfo.TetraLeagueHistory.Data( + record_at=point_time, + tr=previous_point.tr + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.record_at)), + ) + + class Processor(ProcessorMeta): user: User raw_response: RawResponse @@ -125,190 +174,17 @@ class Processor(ProcessorMeta): self.command_type = 'query' await self.get_user() user_info, user_records = await gather(self.get_user_info(), self.get_user_records()) - user_name = user_info.data.user.username.upper() - league = user_info.data.user.league sprint = user_records.data.records.sprint blitz = user_records.data.records.blitz - if isinstance(league, RatedLeague) and league.vs is not None: - today = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0) - forward = timedelta(days=9) - start_time = (today - forward).astimezone(UTC) - async with get_session() as session: - historical_data = ( - await session.scalars( - select(HistoricalData) - .where(HistoricalData.trigger_time >= start_time) - .where(HistoricalData.game_platform == GAME_TYPE) - .where(HistoricalData.user_unique_identifier == self.user.unique_identifier) - ) - ).all() - if historical_data: - extra = ( - await session.scalars( - select(HistoricalData) - .where(HistoricalData.game_platform == GAME_TYPE) - .where(HistoricalData.user_unique_identifier == self.user.unique_identifier) - .order_by(HistoricalData.id.desc()) - .where(HistoricalData.id < min([i.id for i in historical_data])) - .limit(1) - ) - ).one_or_none() - if extra is not None: - historical_data = list(historical_data) - historical_data.append(extra) - - class HistoricalTr(NamedTuple): - time: datetime - tr: float - - histories = [ - HistoricalTr( - time=i.processed_data.user_info.cache.cached_at.astimezone(ZoneInfo('Asia/Shanghai')), - tr=i.processed_data.user_info.data.user.league.rating, - ) - for i in historical_data - if isinstance(i.processed_data, ProcessedData) - and i.processed_data.user_info is not None - and isinstance(i.processed_data.user_info.data.user.league, RatedLeague) - ] - - def get_specified_point( - previous_point: HistoricalTr, behind_point: HistoricalTr, point_time: datetime - ) -> HistoricalTr: - """根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据 - - Args: - previous_point (HistoricalTr): 前面的数据点 - behind_point (HistoricalTr): 后面的数据点 - point_time (datetime): 要推算的点的位置 - - Returns: - HistoricalTr: 要推算的点的数据 - """ - # 求两个点的斜率 - slope = (behind_point.tr - previous_point.tr) / ( - datetime.timestamp(behind_point.time) - datetime.timestamp(previous_point.time) - ) - return HistoricalTr( - time=point_time, - tr=previous_point.tr - + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.time)), - ) - - # 按照时间排序 - histories = sorted(histories, key=lambda x: x.time) - for index, value in enumerate(histories): - # 在历史记录里找有没有今天0点后的数据 - if value.time > today: - histories = histories[:index] + [ - get_specified_point(histories[index - 1], histories[index], today.replace(microsecond=1000)) - ] - break - else: - histories.append( - get_specified_point( - histories[-1], - HistoricalTr(user_info.cache.cached_at, league.rating), - today.replace(microsecond=1000), - ) - ) - if histories[0].time < (today - forward): - histories[0] = get_specified_point( - histories[0], - histories[1], - today - forward, - ) - else: - histories.insert(0, HistoricalTr((today - forward), histories[0].tr)) - - def get_value_bounds(values: list[int | float]) -> tuple[int, int]: - value_max = 10 * ceil(max(values) / 10) - value_min = 10 * floor(min(values) / 10) - return value_max, value_min - - def get_split(value_max: int, value_min: int) -> tuple[int, int]: - offset = 0 - overflow = 0 - - while True: - if (new_max_value := value_max + offset + overflow) > TR_MAX: - overflow -= 1 - continue - if (new_min_value := value_min - offset + overflow) < TR_MIN: - overflow += 1 - continue - if ((new_max_value - new_min_value) / 40).is_integer(): - split_value = int((value_max + offset - (value_min - offset)) / 4) - break - offset += 1 - return split_value, offset + overflow - - value_max, value_min = get_value_bounds([i.tr for i in histories]) - split_value, offset = get_split(value_max, value_min) - - if sprint.record is None: - sprint_value = 'N/A' - else: - if not isinstance(sprint.record, SoloRecord): - raise WhatTheFuckError('40L记录不是单人记录') - duration = timedelta(milliseconds=sprint.record.endcontext.final_time).total_seconds() - sprint_value = f'{duration:.1f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.1f}s' # noqa: PLR2004 - if blitz.record is None: - blitz_value = 'N/A' - else: - if not isinstance(blitz.record, SoloRecord): - raise WhatTheFuckError('Blitz记录不是单人记录') - blitz_value = f'{blitz.record.endcontext.score:,}' - async with HostPage( - await render( - 'tetrio/info', - TETRIOInfo( - user=TETRIOInfo.User( - avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}' - if user_info.data.user.avatar_revision is not None - else f'{{"type":"identicon","hash":"{md5(user_info.data.user.id.encode()).hexdigest()}"}}', # noqa: S324 - name=user_name, - bio=user_info.data.user.bio, - ), - ranking=TETRIOInfo.Ranking( - rating=round(league.glicko, 2), - rd=round(league.rd, 2), - ), - tetra_league=TETRIOInfo.TetraLeague( - rank=league.rank, - tr=round(league.rating, 2), - global_rank=league.standing, - pps=league.pps, - lpm=round(lpm := (league.pps * 24), 2), - apm=league.apm, - apl=round(league.apm / lpm, 2), - vs=league.vs, - adpm=round(adpm := (league.vs * 0.6), 2), - adpl=round(adpm / lpm, 2), - ), - tetra_league_history=TETRIOInfo.TetraLeagueHistory( - data=[TETRIOInfo.TetraLeagueHistory.Data(record_at=time, tr=tr) for time, tr in histories], - split_interval=split_value, - min_tr=value_min, - max_tr=value_max, - offset=offset, - ), - radar=TETRIOInfo.Radar( - app=(app := (league.apm / (60 * league.pps))), - dsps=(dsps := ((league.vs / 100) - (league.apm / 60))), - dspp=(dspp := (dsps / league.pps)), - ci=150 * dspp - 125 * app + 50 * (league.vs / league.apm) - 25, - ge=2 * ((app * dsps) / league.pps), - ), - sprint=sprint_value, - blitz=blitz_value, - ), - ) - ) as page_hash: - return UniMessage.image( - raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', ''))) - ) + if isinstance(sprint.record, MultiRecord) or isinstance(blitz.record, MultiRecord): + raise WhatTheFuckError('单人游戏记录是多人游戏记录') + try: + return UniMessage.image(raw=await self.make_query_image(self.user, user_info, sprint.record, blitz.record)) + except TypeError: + ... # fallback + league = user_info.data.user.league + user_name = user_info.data.user.username.upper() ret_message = '' if isinstance(league, NeverPlayedLeague): ret_message += f'用户 {user_name} 没有排位统计数据' @@ -329,21 +205,148 @@ class Processor(ProcessorMeta): if league.vs is not None: adpm = league.vs * 0.6 ret_message += f'\nADPM: {round(adpm,2)} ( x{round(adpm/lpm,2)} ) ( {league.vs}vs )' - user_records = await self.get_user_records() - sprint = user_records.data.records.sprint if sprint.record is not None: - if not isinstance(sprint.record, SoloRecord): - raise WhatTheFuckError('40L记录不是单人记录') ret_message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s' ret_message += f' ( #{sprint.rank} )' if sprint.rank is not None else '' - blitz = user_records.data.records.blitz if blitz.record is not None: - if not isinstance(blitz.record, SoloRecord): - raise WhatTheFuckError('Blitz记录不是单人记录') ret_message += f'\nBlitz: {blitz.record.endcontext.score}' ret_message += f' ( #{blitz.rank} )' if blitz.rank is not None else '' return UniMessage(ret_message) + @staticmethod + async def query_historical_data(user: User, user_info: InfoSuccess) -> list[TETRIOInfo.TetraLeagueHistory.Data]: + today = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0) + forward = timedelta(days=9) + start_time = (today - forward).astimezone(UTC) + async with get_session() as session: + historical_data = ( + await session.scalars( + select(HistoricalData) + .where(HistoricalData.trigger_time >= start_time) + .where(HistoricalData.game_platform == GAME_TYPE) + .where(HistoricalData.user_unique_identifier == user.unique_identifier) + ) + ).all() + if historical_data: + extra = ( + await session.scalars( + select(HistoricalData) + .where(HistoricalData.game_platform == GAME_TYPE) + .where(HistoricalData.user_unique_identifier == user.unique_identifier) + .order_by(HistoricalData.id.desc()) + .where(HistoricalData.id < min([i.id for i in historical_data])) + .limit(1) + ) + ).one_or_none() + if extra is not None: + historical_data = list(historical_data) + historical_data.append(extra) + + histories = [ + TETRIOInfo.TetraLeagueHistory.Data( + record_at=i.processed_data.user_info.cache.cached_at.astimezone(ZoneInfo('Asia/Shanghai')), + tr=i.processed_data.user_info.data.user.league.rating, + ) + for i in historical_data + if isinstance(i.processed_data, ProcessedData) + and i.processed_data.user_info is not None + and isinstance(i.processed_data.user_info.data.user.league, RatedLeague) + ] + + # 按照时间排序 + histories = sorted(histories, key=lambda x: x.record_at) + for index, value in enumerate(histories): + # 在历史记录里找有没有今天0点后的数据 + if value.record_at > today: + histories = histories[:index] + [ + get_specified_point(histories[index - 1], histories[index], today.replace(microsecond=1000)) + ] + break + else: + histories.append( + get_specified_point( + histories[-1], + TETRIOInfo.TetraLeagueHistory.Data( + record_at=user_info.cache.cached_at, tr=user_info.data.user.league.rating + ), + today.replace(microsecond=1000), + ) + ) + if histories[0].record_at < (today - forward): + histories[0] = get_specified_point( + histories[0], + histories[1], + today - forward, + ) + else: + histories.insert(0, TETRIOInfo.TetraLeagueHistory.Data(record_at=today - forward, tr=histories[0].tr)) + return histories + + @staticmethod + async def make_query_image( + user: User, user_info: InfoSuccess, sprint: SoloRecord | None, blitz: SoloRecord | None + ) -> bytes: + league = user_info.data.user.league + if not isinstance(league, RatedLeague) or league.vs is None: + raise TypeError + user_name = user_info.data.user.username.upper() + histories = await Processor.query_historical_data(user, user_info) + value_max, value_min = get_value_bounds([i.tr for i in histories]) + split_value, offset = get_split(value_max, value_min) + if sprint is not None: + duration = timedelta(milliseconds=sprint.endcontext.final_time).total_seconds() + sprint_value = f'{duration:.1f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.1f}s' # noqa: PLR2004 + else: + sprint_value = 'N/A' + blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A' + async with HostPage( + await render( + 'tetrio/info', + TETRIOInfo( + user=TETRIOInfo.User( + avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}' + if user_info.data.user.avatar_revision is not None + else f'{{"type":"identicon","hash":"{md5(user_info.data.user.id.encode()).hexdigest()}"}}', # noqa: S324 + name=user_name, + bio=user_info.data.user.bio, + ), + ranking=TETRIOInfo.Ranking( + rating=round(league.glicko, 2), + rd=round(league.rd, 2), + ), + tetra_league=TETRIOInfo.TetraLeague( + rank=league.rank, + tr=round(league.rating, 2), + global_rank=league.standing, + pps=league.pps, + lpm=round(lpm := (league.pps * 24), 2), + apm=league.apm, + apl=round(league.apm / lpm, 2), + vs=league.vs, + adpm=round(adpm := (league.vs * 0.6), 2), + adpl=round(adpm / lpm, 2), + ), + tetra_league_history=TETRIOInfo.TetraLeagueHistory( + data=histories, + split_interval=split_value, + min_tr=value_min, + max_tr=value_max, + offset=offset, + ), + radar=TETRIOInfo.Radar( + app=(app := (league.apm / (60 * league.pps))), + dsps=(dsps := ((league.vs / 100) - (league.apm / 60))), + dspp=(dspp := (dsps / league.pps)), + ci=150 * dspp - 125 * app + 50 * (league.vs / league.apm) - 25, + ge=2 * ((app * dsps) / league.pps), + ), + sprint=sprint_value, + blitz=blitz_value, + ), + ) + ) as page_hash: + return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', ''))) + async def get_user(self) -> None: """ 用于获取 UserName 和 UserID 的函数