diff --git a/nonebot_plugin_tetris_stats/games/tetrio/api/schemas/summaries/zenith.py b/nonebot_plugin_tetris_stats/games/tetrio/api/schemas/summaries/zenith.py index 337fc3a..81f484b 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/api/schemas/summaries/zenith.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/api/schemas/summaries/zenith.py @@ -86,7 +86,7 @@ class Record(BaseModel): pb: bool oncepb: bool ts: datetime - revolution: None + revolution: str | None user: User otherusers: list leaderboards: list[str] @@ -97,7 +97,7 @@ class Record(BaseModel): class Best(BaseModel): - record: None # WTF + record: Record | None rank: int diff --git a/nonebot_plugin_tetris_stats/games/tetrio/list.py b/nonebot_plugin_tetris_stats/games/tetrio/list.py index d0a2e66..4d18fd8 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/list.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/list.py @@ -5,9 +5,10 @@ from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[im from ...db import trigger from ...utils.host import HostPage, get_self_netloc +from ...utils.lang import get_lang from ...utils.metrics import get_metrics from ...utils.render import render -from ...utils.render.schemas.tetrio.user.list_v2 import List, TetraLeague, User +from ...utils.render.schemas.v2.tetrio.user.list import Data, List, TetraLeague, User from ...utils.screenshot import screenshot from .. import alc from . import command @@ -63,12 +64,15 @@ async def _( 'v2/tetrio/user/list', List( show_index=True, - users=[ - User( - id=i.id, - name=i.username.upper(), - avatar=f'https://tetr.io/user-content/avatars/{i.id}.jpg', - country=i.country, + data=[ + Data( + user=User( + id=i.id, + name=i.username.upper(), + avatar=f'https://tetr.io/user-content/avatars/{i.id}.jpg', + country=i.country, + xp=i.xp, + ), tetra_league=TetraLeague( rank=i.league.rank, tr=round(i.league.tr, 2), @@ -81,12 +85,11 @@ async def _( vs=metrics.vs, adpl=metrics.adpl, ), - xp=i.xp, - join_at=None, ) for i in league.data.entries if isinstance(i, Entry) ], + _lang=get_lang(), ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/tetrio/query/tools.py b/nonebot_plugin_tetris_stats/games/tetrio/query/tools.py index 2cb30f5..5d90a80 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/query/tools.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/query/tools.py @@ -4,22 +4,22 @@ from typing import TypeVar, overload from zoneinfo import ZoneInfo from ....utils.exception import FallbackError -from ....utils.render.schemas.tetrio.user.base import TetraLeagueHistoryData +from ....utils.render.schemas.base import HistoryData from ..api.schemas.labs.leagueflow import Empty, LeagueFlowSuccess from ..api.schemas.summaries.league import InvalidData, LeagueSuccessModel, NeverPlayedData, NeverRatedData, RatedData def flow_to_history( leagueflow: LeagueFlowSuccess, - handle: Callable[[list[TetraLeagueHistoryData]], list[TetraLeagueHistoryData]] | None = None, -) -> list[TetraLeagueHistoryData]: + handle: Callable[[list[HistoryData]], list[HistoryData]] | None = None, +) -> list[HistoryData]: if isinstance(leagueflow.data, Empty): raise FallbackError start_time = leagueflow.data.start_time.astimezone(ZoneInfo('Asia/Shanghai')) ret = [ - TetraLeagueHistoryData( + HistoryData( record_at=start_time + timedelta(milliseconds=i.timestamp_offset), - tr=i.post_match_tr, + score=i.post_match_tr, ) for i in leagueflow.data.points if start_time + timedelta(milliseconds=i.timestamp_offset) diff --git a/nonebot_plugin_tetris_stats/games/tetrio/query/v1.py b/nonebot_plugin_tetris_stats/games/tetrio/query/v1.py index c1d1be9..457e475 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/query/v1.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/query/v1.py @@ -1,17 +1,17 @@ from asyncio import gather -from datetime import datetime, timedelta +from datetime import timedelta from hashlib import md5 -from math import ceil, floor -from zoneinfo import ZoneInfo from yarl import URL -from ....utils.exception import FallbackError, WhatTheFuckError +from ....utils.chart import get_split, get_value_bounds, handle_history_data +from ....utils.exception import FallbackError from ....utils.host import HostPage, get_self_netloc +from ....utils.metrics import get_metrics from ....utils.render import render -from ....utils.render.schemas.base import Avatar, Ranking -from ....utils.render.schemas.tetrio.user.base import TetraLeagueHistoryData -from ....utils.render.schemas.tetrio.user.info_v1 import Info, Radar, TetraLeague, TetraLeagueHistory, User +from ....utils.render.schemas.base import Avatar, Trending +from ....utils.render.schemas.v1.base import History +from ....utils.render.schemas.v1.tetrio.user.info import Info, Multiplayer, Singleplayer, User from ....utils.screenshot import screenshot from ..api import Player from ..api.schemas.summaries.league import RatedData @@ -19,109 +19,6 @@ from ..constant import TR_MAX, TR_MIN from .tools import flow_to_history, get_league_data -def get_value_bounds(values: list[int | float]) -> tuple[int, int]: - value_max = 10 * ceil(max(values) / 10) - value_min = 10 * floor(min(values) / 10) - return value_max, value_min - - -def get_split(value_max: int, value_min: int) -> tuple[int, int]: - offset = 0 - overflow = 0 - - while True: - if (new_max_value := value_max + offset + overflow) > TR_MAX: - overflow -= 1 - continue - if (new_min_value := value_min - offset + overflow) < TR_MIN: - overflow += 1 - continue - if ((new_max_value - new_min_value) / 40).is_integer(): - split_value = int((value_max + offset - (value_min - offset)) / 4) - break - offset += 1 - return split_value, offset + overflow - - -def get_specified_point( - previous_point: TetraLeagueHistoryData, - behind_point: TetraLeagueHistoryData, - point_time: datetime, -) -> TetraLeagueHistoryData: - """根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据 - - Args: - previous_point (Data): 前面的数据点 - behind_point (Data): 后面的数据点 - point_time (datetime): 要推算的点的位置 - - Returns: - Data: 要推算的点的数据 - """ - # 求两个点的斜率 - slope = (behind_point.tr - previous_point.tr) / ( - datetime.timestamp(behind_point.record_at) - datetime.timestamp(previous_point.record_at) - ) - return TetraLeagueHistoryData( - record_at=point_time, - tr=previous_point.tr + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.record_at)), - ) - - -def handle_history_data(data: list[TetraLeagueHistoryData]) -> list[TetraLeagueHistoryData]: # noqa: C901, PLR0912 - # 按照 记录时间 对数据进行排序 - data.sort(key=lambda x: x.record_at) - - # 定义时间边界, 右边界为当前时间的当天零点, 左边界为右边界前推9天 - # 返回值的[0]和[-1]分别应满足left_border和right_border - zero = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0) - left_border = zero - timedelta(days=9) - right_border = zero.replace(microsecond=1000) - - lefts: list[TetraLeagueHistoryData] = [] - in_border: list[TetraLeagueHistoryData] = [] - rights: list[TetraLeagueHistoryData] = [] - - # 根据 记录时间 将数据分类到对应的列表中 - for i in data: - if i.record_at < left_border: - lefts.append(i) - elif i.record_at < right_border: - in_border.append(i) - else: - rights.append(i) - - ret: list[TetraLeagueHistoryData] = [] - - # 处理左边界的点 - if lefts and in_border: # 如果边界左侧和边界内都有值则推算 - ret.append(get_specified_point(lefts[-1], in_border[0], left_border)) - elif lefts and not in_border: # 如果边界左侧有值但是边界内没有值则直接取左侧的最后一个值 - ret.append(TetraLeagueHistoryData(tr=lefts[-1].tr, record_at=left_border)) - elif not lefts and in_border: # 如果边界左侧没有值但是边界内有值则直接取边界内的第一个值 - ret.append(TetraLeagueHistoryData(tr=in_border[0].tr, record_at=left_border)) - elif not lefts and not in_border and rights: # 如果边界左侧和边界内都没有值但是边界右侧有值则直接取边界右侧的第一个值 # fmt: skip - ret.append(TetraLeagueHistoryData(tr=rights[0].tr, record_at=left_border)) - else: # 暂时没想到其他情况 - raise WhatTheFuckError - - # 添加边界内数据 - ret.extend(in_border) - - # 处理右边界的点 - if in_border and rights: # 如果边界内和边界右侧都有值则推算 - ret.append(get_specified_point(in_border[-1], rights[0], right_border)) - elif not in_border and rights: # 如果边界内没有值但是边界右侧有值则直接取右侧的第一个值 - ret.append(TetraLeagueHistoryData(tr=rights[0].tr, record_at=right_border)) - elif in_border and not rights: # 如果边界内有值但是边界右侧没有值则直接取边界内的最后一个值 - ret.append(TetraLeagueHistoryData(tr=in_border[-1].tr, record_at=right_border)) - elif not in_border and not rights and lefts: # 如果边界内和边界右侧都没有值但是边界左侧有值则直接取边界左侧的最后一个值 # fmt: skip - ret.append(TetraLeagueHistoryData(tr=lefts[-1].tr, record_at=right_border)) - else: # 暂时没想到其他情况 - raise WhatTheFuckError - return ret - - async def make_query_image_v1(player: Player) -> bytes: ( (user, user_info, league, sprint, blitz, leagueflow), @@ -134,8 +31,8 @@ async def make_query_image_v1(player: Player) -> bytes: if league_data.vs is None: raise FallbackError histories = flow_to_history(leagueflow, handle_history_data) - value_max, value_min = get_value_bounds([i.tr for i in histories]) - split_value, offset = get_split(value_max, value_min) + values = get_value_bounds([i.score for i in histories]) + split_value, offset = get_split(values, TR_MAX, TR_MIN) if sprint.data.record is not None: duration = timedelta(milliseconds=sprint.data.record.results.stats.finaltime).total_seconds() sprint_value = f'{duration:.3f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.3f}s' # noqa: PLR2004 @@ -143,6 +40,9 @@ async def make_query_image_v1(player: Player) -> bytes: sprint_value = 'N/A' blitz_value = f'{blitz.data.record.results.stats.score:,}' if blitz.data.record is not None else 'N/A' netloc = get_self_netloc() + dsps: float + dspp: float + # make mypy happy async with HostPage( page=await render( 'v1/tetrio/info', @@ -159,38 +59,40 @@ async def make_query_image_v1(player: Player) -> bytes: name=user.name.upper(), bio=user_info.data.bio, ), - ranking=Ranking( - rating=round(league_data.glicko, 2), + multiplayer=Multiplayer( + glicko=f'{round(league_data.glicko, 2):,}', rd=round(league_data.rd, 2), - ), - tetra_league=TetraLeague( rank=league_data.rank, - tr=round(league_data.tr, 2), + tr=f'{round(league_data.tr, 2):,}', global_rank=league_data.standing, - pps=league_data.pps, - lpm=round(lpm := (league_data.pps * 24), 2), - apm=league_data.apm, - apl=round(league_data.apm / lpm, 2), - vs=league_data.vs, - adpm=round(adpm := (league_data.vs * 0.6), 2), - adpl=round(adpm / lpm, 2), - ), - tetra_league_history=TetraLeagueHistory( - data=histories, - split_interval=split_value, - min_tr=value_min, - max_tr=value_max, - offset=offset, - ), - radar=Radar( + history=History( + data=histories, + split_interval=split_value, + min_value=values.value_min, + max_value=values.value_max, + offset=offset, + ), + lpm=(metrics := get_metrics(pps=league_data.pps, apm=league_data.apm, vs=league_data.vs)).lpm, + pps=metrics.pps, + lpm_trending=Trending.KEEP, + apm=metrics.apm, + apl=metrics.apl, + apm_trending=Trending.KEEP, + adpm=metrics.adpm, + vs=metrics.vs, + adpl=metrics.adpl, + adpm_trending=Trending.KEEP, app=(app := (league_data.apm / (60 * league_data.pps))), dsps=(dsps := ((league_data.vs / 100) - (league_data.apm / 60))), dspp=(dspp := (dsps / league_data.pps)), ci=150 * dspp - 125 * app + 50 * (league_data.vs / league_data.apm) - 25, ge=2 * ((app * dsps) / league_data.pps), ), - sprint=sprint_value, - blitz=blitz_value, + singleplayer=Singleplayer( + sprint=sprint_value, + blitz=blitz_value, + ), + _lang='zh-CN', ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/tetrio/query/v2.py b/nonebot_plugin_tetris_stats/games/tetrio/query/v2.py index 1577072..dcf9638 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/query/v2.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/query/v2.py @@ -6,11 +6,13 @@ from yarl import URL from ....utils.exception import FallbackError from ....utils.host import HostPage, get_self_netloc +from ....utils.lang import get_lang from ....utils.metrics import get_metrics from ....utils.render import render from ....utils.render.schemas.base import Avatar -from ....utils.render.schemas.tetrio.user.info_v2 import ( +from ....utils.render.schemas.v2.tetrio.user.info import ( Badge, + Best, Blitz, Info, Sprint, @@ -18,7 +20,9 @@ from ....utils.render.schemas.tetrio.user.info_v2 import ( TetraLeague, TetraLeagueStatistic, User, + Week, Zen, + Zenith, ) from ....utils.screenshot import screenshot from ..api import Player @@ -29,10 +33,23 @@ from .tools import flow_to_history, handling_special_value async def make_query_image_v2(player: Player) -> bytes: ( (user, user_info, league, sprint, blitz, zen), - (avatar_revision, banner_revision, leagueflow), + (avatar_revision, banner_revision, leagueflow, zenith, zenithex), ) = await gather( - gather(player.user, player.get_info(), player.league, player.sprint, player.blitz, player.zen), - gather(player.avatar_revision, player.banner_revision, player.get_leagueflow()), + gather( + player.user, + player.get_info(), + player.league, + player.sprint, + player.blitz, + player.zen, + ), + gather( + player.avatar_revision, + player.banner_revision, + player.get_leagueflow(), + player.get_summaries('zenith'), + player.get_summaries('zenithex'), + ), ) if sprint.data.record is not None: duration = timedelta(milliseconds=sprint.data.record.results.stats.finaltime).total_seconds() @@ -62,12 +79,9 @@ async def make_query_image_v2(player: Player) -> bytes: user=User( id=user.ID, name=user.name.upper(), - bio=user_info.data.bio, - banner=str( - URL(f'http://{netloc}/host/resource/tetrio/banners/{user.ID}') % {'revision': banner_revision} - ) - if banner_revision is not None and banner_revision != 0 - else None, + country=user_info.data.country, + role=user_info.data.role, + botmaster=user_info.data.botmaster, avatar=str( URL(f'http://{netloc}/host/resource/tetrio/avatars/{user.ID}') % {'revision': avatar_revision} ) @@ -76,6 +90,15 @@ async def make_query_image_v2(player: Player) -> bytes: type='identicon', hash=md5(user.ID.encode()).hexdigest(), # noqa: S324 ), + banner=str( + URL(f'http://{netloc}/host/resource/tetrio/banners/{user.ID}') % {'revision': banner_revision} + ) + if banner_revision is not None and banner_revision != 0 + else None, + bio=user_info.data.bio, + friend_count=user_info.data.friend_count, + supporter_tier=user_info.data.supporter_tier, + bad_standing=user_info.data.badstanding or False, badges=[ Badge( id=i.id, @@ -85,12 +108,9 @@ async def make_query_image_v2(player: Player) -> bytes: ) for i in user_info.data.badges ], - country=user_info.data.country, - role=user_info.data.role, xp=user_info.data.xp, - friend_count=user_info.data.friend_count, - supporter_tier=user_info.data.supporter_tier, - bad_standing=user_info.data.badstanding or False, + ar=user_info.data.ar, + achievements=user_info.data.achievements, playtime=play_time, join_at=user_info.data.ts, ), @@ -113,6 +133,40 @@ async def make_query_image_v2(player: Player) -> bytes: ) if not isinstance(league.data, NeverPlayedData | InvalidData) else None, + zenith=Zenith( + week=Week( + altitude=zenith.data.record.results.stats.zenith.altitude, + global_rank=zenith.data.rank, + country_rank=zenith.data.rank_local, + play_at=zenith.data.record.ts, + ) + if zenith.data.record is not None + else None, + best=Best( + altitude=zenith.data.best.record.results.stats.zenith.altitude, + global_rank=zenith.data.best.rank, + play_at=zenith.data.best.record.ts, + ) + if zenith.data.best.record is not None + else None, + ), + zenithex=Zenith( + week=Week( + altitude=zenithex.data.record.results.stats.zenith.altitude, + global_rank=zenithex.data.rank, + country_rank=zenithex.data.rank_local, + play_at=zenithex.data.record.ts, + ) + if zenithex.data.record is not None + else None, + best=Best( + altitude=zenithex.data.best.record.results.stats.zenith.altitude, + global_rank=zenithex.data.best.rank, + play_at=zenithex.data.best.record.ts, + ) + if zenithex.data.best.record is not None + else None, + ), statistic=Statistic( total=handling_special_value(user_info.data.gamesplayed), wins=handling_special_value(user_info.data.gameswon), @@ -120,6 +174,7 @@ async def make_query_image_v2(player: Player) -> bytes: sprint=Sprint( time=sprint_value, global_rank=sprint.data.rank, + country_rank=sprint.data.rank_local, play_at=sprint.data.record.ts, ) if sprint.data.record is not None @@ -127,11 +182,13 @@ async def make_query_image_v2(player: Player) -> bytes: blitz=Blitz( score=blitz.data.record.results.stats.score, global_rank=blitz.data.rank, + country_rank=blitz.data.rank_local, play_at=blitz.data.record.ts, ) if blitz.data.record is not None else None, zen=Zen(level=zen.data.level, score=zen.data.score), + _lang=get_lang(), ), ), ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/tetrio/rank/all.py b/nonebot_plugin_tetris_stats/games/tetrio/rank/all.py index b9a6db8..53711ee 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/rank/all.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/rank/all.py @@ -10,13 +10,14 @@ from sqlalchemy.orm import selectinload from ....db import trigger from ....utils.host import HostPage, get_self_netloc +from ....utils.lang import get_lang from ....utils.metrics import get_metrics from ....utils.render import render -from ....utils.render.schemas.tetrio.rank.v1 import Data as DataV1 -from ....utils.render.schemas.tetrio.rank.v1 import ItemData as ItemDataV1 -from ....utils.render.schemas.tetrio.rank.v2 import AverageData as AverageDataV2 -from ....utils.render.schemas.tetrio.rank.v2 import Data as DataV2 -from ....utils.render.schemas.tetrio.rank.v2 import ItemData as ItemDataV2 +from ....utils.render.schemas.v1.tetrio.rank import Data as DataV1 +from ....utils.render.schemas.v1.tetrio.rank import ItemData as ItemDataV1 +from ....utils.render.schemas.v2.tetrio.rank import AverageData as AverageDataV2 +from ....utils.render.schemas.v2.tetrio.rank import Data as DataV2 +from ....utils.render.schemas.v2.tetrio.rank import ItemData as ItemDataV2 from ....utils.screenshot import screenshot from .. import alc from ..constant import GAME_TYPE @@ -82,6 +83,7 @@ async def make_image_v1(latest_data: TETRIOLeagueStats, compare_data: TETRIOLeag for i in zip(latest_data.fields, compare_data.fields, strict=True) }, updated_at=latest_data.update_time, + _lang=get_lang(), ), ) ) as page_hash: @@ -109,6 +111,7 @@ async def make_image_v2(latest_data: TETRIOLeagueStats, compare_data: TETRIOLeag for i in zip(latest_data.fields, compare_data.fields, strict=True) }, updated_at=latest_data.update_time, + _lang=get_lang(), ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py b/nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py index 90b7f05..d361850 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/rank/detail.py @@ -12,9 +12,10 @@ from sqlalchemy.orm import selectinload from ....db import trigger from ....utils.host import HostPage, get_self_netloc +from ....utils.lang import get_lang from ....utils.metrics import get_metrics from ....utils.render import render -from ....utils.render.schemas.tetrio.rank.detail import Data, SpecialData +from ....utils.render.schemas.v2.tetrio.rank.detail import Data, SpecialData from ....utils.screenshot import screenshot from .. import alc from ..api.typedefs import ValidRank @@ -122,6 +123,7 @@ async def make_image(rank: ValidRank, latest: TETRIOLeagueStats, compare: TETRIO vs_holder=latest_data.high_vs.username.upper(), ), updated_at=latest.update_time.replace(tzinfo=UTC).astimezone(ZoneInfo('Asia/Shanghai')), + _lang=get_lang(), ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/tetrio/record/blitz.py b/nonebot_plugin_tetris_stats/games/tetrio/record/blitz.py index fbd8200..a766bcc 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/record/blitz.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/record/blitz.py @@ -16,11 +16,12 @@ from ....db import query_bind_info, trigger from ....i18n import Lang from ....utils.exception import RecordNotFoundError from ....utils.host import HostPage, get_self_netloc +from ....utils.lang import get_lang from ....utils.metrics import get_metrics from ....utils.render import render from ....utils.render.schemas.base import Avatar -from ....utils.render.schemas.tetrio.record.base import Finesse, Max, Mini, Tspins, User -from ....utils.render.schemas.tetrio.record.blitz import Record, Statistic +from ....utils.render.schemas.v2.tetrio.record.base import Finesse, Max, Mini, Tspins, User +from ....utils.render.schemas.v2.tetrio.record.blitz import Record, Statistic from ....utils.screenshot import screenshot from ....utils.typedefs import Me from .. import alc @@ -145,6 +146,7 @@ async def make_blitz_image(player: Player) -> bytes: level=stats.level, ), play_at=blitz.data.record.ts, + _lang=get_lang(), ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/tetrio/record/sprint.py b/nonebot_plugin_tetris_stats/games/tetrio/record/sprint.py index fd98b70..f9cb52e 100644 --- a/nonebot_plugin_tetris_stats/games/tetrio/record/sprint.py +++ b/nonebot_plugin_tetris_stats/games/tetrio/record/sprint.py @@ -16,11 +16,12 @@ from ....db import query_bind_info, trigger from ....i18n import Lang from ....utils.exception import RecordNotFoundError from ....utils.host import HostPage, get_self_netloc +from ....utils.lang import get_lang from ....utils.metrics import get_metrics from ....utils.render import render from ....utils.render.schemas.base import Avatar -from ....utils.render.schemas.tetrio.record.base import Finesse, Max, Mini, Statistic, Tspins, User -from ....utils.render.schemas.tetrio.record.sprint import Record +from ....utils.render.schemas.v2.tetrio.record.base import Finesse, Max, Mini, Statistic, Tspins, User +from ....utils.render.schemas.v2.tetrio.record.sprint import Record from ....utils.screenshot import screenshot from ....utils.typedefs import Me from .. import alc @@ -90,7 +91,7 @@ async def make_sprint_image(player: Player) -> bytes: netloc = get_self_netloc() async with HostPage( page=await render( - 'v2/tetrio/record/40l', + 'v2/tetrio/record/sprint', Record( type='best', user=User( @@ -145,6 +146,7 @@ async def make_sprint_image(player: Player) -> bytes: ), ), play_at=sprint.data.record.ts, + _lang=get_lang(), ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/top/query.py b/nonebot_plugin_tetris_stats/games/top/query.py index b7bd06c..fcb60a6 100644 --- a/nonebot_plugin_tetris_stats/games/top/query.py +++ b/nonebot_plugin_tetris_stats/games/top/query.py @@ -11,12 +11,13 @@ from ...db import query_bind_info, trigger from ...i18n import Lang from ...utils.exception import FallbackError from ...utils.host import HostPage, get_self_netloc +from ...utils.lang import get_lang from ...utils.metrics import TetrisMetricsBasicWithLPM, get_metrics from ...utils.render import render from ...utils.render.avatar import get_avatar -from ...utils.render.schemas.base import People -from ...utils.render.schemas.top_info import Data as InfoData -from ...utils.render.schemas.top_info import Info +from ...utils.render.schemas.base import People, Trending +from ...utils.render.schemas.v1.top.info import Data as InfoData +from ...utils.render.schemas.v1.top.info import Info from ...utils.screenshot import screenshot from ...utils.typedefs import Me from . import alc @@ -79,8 +80,23 @@ async def make_query_image(profile: UserProfile) -> bytes: 'v1/top/info', Info( user=People(avatar=get_avatar(profile.user_name), name=profile.user_name), - today=InfoData(pps=today.pps, lpm=today.lpm, apm=today.apm, apl=today.apl), - history=InfoData(pps=history.pps, lpm=history.lpm, apm=history.apm, apl=history.apl), + today=InfoData( + pps=today.pps, + lpm=today.lpm, + lpm_trending=Trending.KEEP, + apm=today.apm, + apl=today.apl, + apm_trending=Trending.KEEP, + ), + historical=InfoData( + pps=history.pps, + lpm=history.lpm, + lpm_trending=Trending.KEEP, + apm=history.apm, + apl=history.apl, + apm_trending=Trending.KEEP, + ), + _lang=get_lang(), ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/games/tos/query.py b/nonebot_plugin_tetris_stats/games/tos/query.py index 04bfce4..5e3b577 100644 --- a/nonebot_plugin_tetris_stats/games/tos/query.py +++ b/nonebot_plugin_tetris_stats/games/tos/query.py @@ -1,7 +1,8 @@ from asyncio import gather -from datetime import timedelta +from datetime import datetime, timedelta, timezone from http import HTTPStatus from typing import Literal, NamedTuple +from zoneinfo import ZoneInfo from nonebot.adapters import Event from nonebot.matcher import Matcher @@ -12,21 +13,27 @@ from nonebot_plugin_session import EventSession from nonebot_plugin_session_orm import get_session_persist_id # type: ignore[import-untyped] from nonebot_plugin_user import get_user from nonebot_plugin_userinfo import EventUserInfo, UserInfo +from sqlalchemy import select from ...db import query_bind_info, trigger from ...i18n import Lang +from ...utils.chart import get_split, get_value_bounds, handle_history_data from ...utils.exception import RequestError from ...utils.host import HostPage, get_self_netloc from ...utils.image import get_avatar +from ...utils.lang import get_lang from ...utils.metrics import TetrisMetricsProWithLPMADPM, get_metrics from ...utils.render import render from ...utils.render.avatar import get_avatar as get_random_avatar -from ...utils.render.schemas.base import People, Ranking -from ...utils.render.schemas.tos_info import Info, Multiplayer, Radar +from ...utils.render.schemas.base import HistoryData, People, Trending +from ...utils.render.schemas.v1.base import History +from ...utils.render.schemas.v1.tos.info import Info, Multiplayer, Singleplayer from ...utils.screenshot import screenshot +from ...utils.time_it import time_it from ...utils.typedefs import Me, Number from . import alc from .api import Player +from .api.models import TOSHistoricalData from .api.schemas.user_info import UserInfoSuccess from .constant import GAME_TYPE @@ -148,6 +155,7 @@ async def _(account: Player, event_session: EventSession): command_args=[], ): user_info, game_data = await gather(account.get_info(), get_game_data(account)) + await get_historical_data(user_info.data.teaid) if game_data is not None: await UniMessage.image(raw=await make_query_image(user_info, game_data, None)).finish() await make_query_text(user_info, game_data).finish() @@ -156,7 +164,7 @@ async def _(account: Player, event_session: EventSession): class GameData(NamedTuple): game_num: int metrics: TetrisMetricsProWithLPMADPM - OR: Number + or_: Number dspp: Number ge: Number @@ -199,12 +207,48 @@ async def get_game_data(player: Player, query_num: int = 50) -> GameData | None: return GameData( game_num=num, metrics=metrics, - OR=total_offset / total_receive * 100, + or_=total_offset / total_receive * 100, dspp=total_dig / total_pieses, ge=2 * ((total_attack * total_dig) / total_pieses**2), ) +@time_it +async def get_historical_data(unique_identifier: str) -> list[HistoryData]: + async with get_session() as session: + user_infos = ( + await session.scalars( + select(TOSHistoricalData) + .where(TOSHistoricalData.user_unique_identifier == unique_identifier) + .where(TOSHistoricalData.api_type == 'User Info') + .where( + TOSHistoricalData.update_time + > ( + datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0) + - timedelta(days=9) + ).replace(tzinfo=timezone.utc) + ) + .order_by(TOSHistoricalData.id.asc()) + ) + ).all() + extra_info = ( + await session.scalars( + select(TOSHistoricalData) + .where(TOSHistoricalData.id < user_infos[0].id) + .where(TOSHistoricalData.user_unique_identifier == unique_identifier) + .where(TOSHistoricalData.api_type == 'User Info') + .limit(1) + ) + ).one_or_none() + if extra_info is not None: + user_infos = [extra_info, *user_infos] + return [ + HistoryData(score=float(i.data.data.rating_now), record_at=i.update_time.astimezone(ZoneInfo('Asia/Shanghai'))) + for i in user_infos + if isinstance(i.data, UserInfoSuccess) + ] + + async def make_query_image(user_info: UserInfoSuccess, game_data: GameData, event_user_info: UserInfo | None) -> bytes: metrics = game_data.metrics sprint_value = ( @@ -226,26 +270,38 @@ async def make_query_image(user_info: UserInfoSuccess, game_data: GameData, even else get_random_avatar(user_info.data.teaid), name=user_info.data.name, ), - ranking=Ranking(rating=float(user_info.data.ranking), rd=round(float(user_info.data.rd_now), 2)), multiplayer=Multiplayer( - pps=metrics.pps, + history=History( + data=(data := handle_history_data(await get_historical_data(user_info.data.teaid))), + max_value=(values := get_value_bounds([i.score for i in data])).value_max, + min_value=values.value_min, + split_interval=(split := get_split(value_bound=values, min_value=0)).split_value, + offset=split.offset, + ), + rating=round(float(user_info.data.rating_now), 2), + rd=round(float(user_info.data.rd_now), 2), lpm=metrics.lpm, + pps=metrics.pps, + lpm_trending=Trending.KEEP, apm=metrics.apm, apl=metrics.apl, - vs=metrics.vs, + apm_trending=Trending.KEEP, adpm=metrics.adpm, + vs=metrics.vs, adpl=metrics.adpl, - ), - radar=Radar( + adpm_trending=Trending.KEEP, app=(app := (metrics.apm / (60 * metrics.pps))), - OR=game_data.OR, + or_=game_data.or_, dspp=game_data.dspp, ci=150 * game_data.dspp - 125 * app + 50 * (metrics.vs / metrics.apm) - 25, ge=game_data.ge, ), - sprint=sprint_value, - challenge=f'{int(user_info.data.pb_challenge):,}' if user_info.data.pb_challenge != '0' else 'N/A', - marathon=f'{int(user_info.data.pb_marathon):,}' if user_info.data.pb_marathon != '0' else 'N/A', + singleplayer=Singleplayer( + sprint=sprint_value, + challenge=f'{int(user_info.data.pb_challenge):,}' if user_info.data.pb_challenge != '0' else 'N/A', + marathon=f'{int(user_info.data.pb_marathon):,}' if user_info.data.pb_marathon != '0' else 'N/A', + ), + _lang=get_lang(), ), ) ) as page_hash: diff --git a/nonebot_plugin_tetris_stats/utils/chart.py b/nonebot_plugin_tetris_stats/utils/chart.py new file mode 100644 index 0000000..17e3661 --- /dev/null +++ b/nonebot_plugin_tetris_stats/utils/chart.py @@ -0,0 +1,122 @@ +from datetime import datetime, timedelta +from math import ceil, floor, inf +from typing import NamedTuple +from zoneinfo import ZoneInfo + +from .exception import WhatTheFuckError +from .render.schemas.base import HistoryData +from .typedefs import Number + + +class ValueBound(NamedTuple): + value_max: int + value_min: int + + +class Split(NamedTuple): + split_value: int + offset: int + + +def get_value_bounds(values: list[int | float]) -> ValueBound: + value_max = 10 * ceil(max(values) / 10) + value_min = 10 * floor(min(values) / 10) + return ValueBound(value_max, value_min) + + +def get_split(value_bound: ValueBound, max_value: Number = inf, min_value: Number = -inf) -> Split: + offset = 0 + overflow = 0 + + while True: + if (new_max_value := value_bound.value_max + offset + overflow) > max_value: + overflow -= 1 + continue + if (new_min_value := value_bound.value_min - offset + overflow) < min_value: + overflow += 1 + continue + if ((new_max_value - new_min_value) / 40).is_integer(): + split_value = int((value_bound.value_max + offset - (value_bound.value_min - offset)) / 4) + break + offset += 1 + return Split(split_value, offset + overflow) + + +def get_specified_point( + previous_point: HistoryData, + behind_point: HistoryData, + point_time: datetime, +) -> HistoryData: + """根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据 + + Args: + previous_point (Data): 前面的数据点 + behind_point (Data): 后面的数据点 + point_time (datetime): 要推算的点的位置 + + Returns: + Data: 要推算的点的数据 + """ + # 求两个点的斜率 + slope = (behind_point.score - previous_point.score) / ( + datetime.timestamp(behind_point.record_at) - datetime.timestamp(previous_point.record_at) + ) + return HistoryData( + record_at=point_time, + score=previous_point.score + + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.record_at)), + ) + + +def handle_history_data(data: list[HistoryData]) -> list[HistoryData]: # noqa: C901, PLR0912 + # 按照 记录时间 对数据进行排序 + data.sort(key=lambda x: x.record_at) + + # 定义时间边界, 右边界为当前时间的当天零点, 左边界为右边界前推9天 + # 返回值的[0]和[-1]分别应满足left_border和right_border + zero = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0) + left_border = zero - timedelta(days=9) + right_border = zero.replace(microsecond=1000) + + lefts: list[HistoryData] = [] + in_border: list[HistoryData] = [] + rights: list[HistoryData] = [] + + # 根据 记录时间 将数据分类到对应的列表中 + for i in data: + if i.record_at < left_border: + lefts.append(i) + elif i.record_at < right_border: + in_border.append(i) + else: + rights.append(i) + + ret: list[HistoryData] = [] + + # 处理左边界的点 + if lefts and in_border: # 如果边界左侧和边界内都有值则推算 + ret.append(get_specified_point(lefts[-1], in_border[0], left_border)) + elif lefts and not in_border: # 如果边界左侧有值但是边界内没有值则直接取左侧的最后一个值 + ret.append(HistoryData(score=lefts[-1].score, record_at=left_border)) + elif not lefts and in_border: # 如果边界左侧没有值但是边界内有值则直接取边界内的第一个值 + ret.append(HistoryData(score=in_border[0].score, record_at=left_border)) + elif not lefts and not in_border and rights: # 如果边界左侧和边界内都没有值但是边界右侧有值则直接取边界右侧的第一个值 # fmt: skip + ret.append(HistoryData(score=rights[0].score, record_at=left_border)) + else: # 暂时没想到其他情况 + raise WhatTheFuckError + + # 添加边界内数据 + ret.extend(in_border) + + # 处理右边界的点 + if in_border and rights: # 如果边界内和边界右侧都有值则推算 + ret.append(get_specified_point(in_border[-1], rights[0], right_border)) + elif not in_border and rights: # 如果边界内没有值但是边界右侧有值则直接取右侧的第一个值 + ret.append(HistoryData(score=rights[0].score, record_at=right_border)) + elif in_border and not rights: # 如果边界内有值但是边界右侧没有值则直接取边界内的最后一个值 + ret.append(HistoryData(score=in_border[-1].score, record_at=right_border)) + elif not in_border and not rights and lefts: # 如果边界内和边界右侧都没有值但是边界左侧有值则直接取边界左侧的最后一个值 # fmt: skip + ret.append(HistoryData(score=lefts[-1].score, record_at=right_border)) + else: # 暂时没想到其他情况 + raise WhatTheFuckError + return ret