🎨 拆分函数

This commit is contained in:
2024-05-10 11:13:39 +08:00
parent a5c4e7df5c
commit 8ba3f3c3f4

View File

@@ -6,7 +6,7 @@ from hashlib import md5, sha512
from math import ceil, floor
from re import match
from statistics import mean
from typing import Literal, NamedTuple
from typing import Literal
from urllib.parse import urlunparse
from zoneinfo import ZoneInfo
@@ -43,7 +43,7 @@ from .schemas.response import ProcessedData, RawResponse
from .schemas.user import User
from .schemas.user_info import NeverPlayedLeague, NeverRatedLeague, RatedLeague, UserInfo
from .schemas.user_info import SuccessModel as InfoSuccess
from .schemas.user_records import SoloRecord, UserRecords
from .schemas.user_records import MultiRecord, SoloRecord, UserRecords
from .schemas.user_records import SuccessModel as RecordsSuccess
from .typing import Rank
@@ -60,6 +60,55 @@ def identify_user_info(info: str) -> User | MessageFormatError:
return MessageFormatError('用户名/ID不合法')
def get_value_bounds(values: list[int | float]) -> tuple[int, int]:
value_max = 10 * ceil(max(values) / 10)
value_min = 10 * floor(min(values) / 10)
return value_max, value_min
def get_split(value_max: int, value_min: int) -> tuple[int, int]:
offset = 0
overflow = 0
while True:
if (new_max_value := value_max + offset + overflow) > TR_MAX:
overflow -= 1
continue
if (new_min_value := value_min - offset + overflow) < TR_MIN:
overflow += 1
continue
if ((new_max_value - new_min_value) / 40).is_integer():
split_value = int((value_max + offset - (value_min - offset)) / 4)
break
offset += 1
return split_value, offset + overflow
def get_specified_point(
previous_point: TETRIOInfo.TetraLeagueHistory.Data,
behind_point: TETRIOInfo.TetraLeagueHistory.Data,
point_time: datetime,
) -> TETRIOInfo.TetraLeagueHistory.Data:
"""根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据
Args:
previous_point (TETRIOInfo.TetraLeagueHistory.Data): 前面的数据点
behind_point (TETRIOInfo.TetraLeagueHistory.Data): 后面的数据点
point_time (datetime): 要推算的点的位置
Returns:
TETRIOInfo.TetraLeagueHistory.Data: 要推算的点的数据
"""
# 求两个点的斜率
slope = (behind_point.tr - previous_point.tr) / (
datetime.timestamp(behind_point.record_at) - datetime.timestamp(previous_point.record_at)
)
return TETRIOInfo.TetraLeagueHistory.Data(
record_at=point_time,
tr=previous_point.tr + slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.record_at)),
)
class Processor(ProcessorMeta):
user: User
raw_response: RawResponse
@@ -125,190 +174,17 @@ class Processor(ProcessorMeta):
self.command_type = 'query'
await self.get_user()
user_info, user_records = await gather(self.get_user_info(), self.get_user_records())
user_name = user_info.data.user.username.upper()
league = user_info.data.user.league
sprint = user_records.data.records.sprint
blitz = user_records.data.records.blitz
if isinstance(league, RatedLeague) and league.vs is not None:
today = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
forward = timedelta(days=9)
start_time = (today - forward).astimezone(UTC)
async with get_session() as session:
historical_data = (
await session.scalars(
select(HistoricalData)
.where(HistoricalData.trigger_time >= start_time)
.where(HistoricalData.game_platform == GAME_TYPE)
.where(HistoricalData.user_unique_identifier == self.user.unique_identifier)
)
).all()
if historical_data:
extra = (
await session.scalars(
select(HistoricalData)
.where(HistoricalData.game_platform == GAME_TYPE)
.where(HistoricalData.user_unique_identifier == self.user.unique_identifier)
.order_by(HistoricalData.id.desc())
.where(HistoricalData.id < min([i.id for i in historical_data]))
.limit(1)
)
).one_or_none()
if extra is not None:
historical_data = list(historical_data)
historical_data.append(extra)
class HistoricalTr(NamedTuple):
time: datetime
tr: float
histories = [
HistoricalTr(
time=i.processed_data.user_info.cache.cached_at.astimezone(ZoneInfo('Asia/Shanghai')),
tr=i.processed_data.user_info.data.user.league.rating,
)
for i in historical_data
if isinstance(i.processed_data, ProcessedData)
and i.processed_data.user_info is not None
and isinstance(i.processed_data.user_info.data.user.league, RatedLeague)
]
def get_specified_point(
previous_point: HistoricalTr, behind_point: HistoricalTr, point_time: datetime
) -> HistoricalTr:
"""根据给出的 previous_point 和 behind_point, 推算 point_time 点处的数据
Args:
previous_point (HistoricalTr): 前面的数据点
behind_point (HistoricalTr): 后面的数据点
point_time (datetime): 要推算的点的位置
Returns:
HistoricalTr: 要推算的点的数据
"""
# 求两个点的斜率
slope = (behind_point.tr - previous_point.tr) / (
datetime.timestamp(behind_point.time) - datetime.timestamp(previous_point.time)
)
return HistoricalTr(
time=point_time,
tr=previous_point.tr
+ slope * (datetime.timestamp(point_time) - datetime.timestamp(previous_point.time)),
)
# 按照时间排序
histories = sorted(histories, key=lambda x: x.time)
for index, value in enumerate(histories):
# 在历史记录里找有没有今天0点后的数据
if value.time > today:
histories = histories[:index] + [
get_specified_point(histories[index - 1], histories[index], today.replace(microsecond=1000))
]
break
else:
histories.append(
get_specified_point(
histories[-1],
HistoricalTr(user_info.cache.cached_at, league.rating),
today.replace(microsecond=1000),
)
)
if histories[0].time < (today - forward):
histories[0] = get_specified_point(
histories[0],
histories[1],
today - forward,
)
else:
histories.insert(0, HistoricalTr((today - forward), histories[0].tr))
def get_value_bounds(values: list[int | float]) -> tuple[int, int]:
value_max = 10 * ceil(max(values) / 10)
value_min = 10 * floor(min(values) / 10)
return value_max, value_min
def get_split(value_max: int, value_min: int) -> tuple[int, int]:
offset = 0
overflow = 0
while True:
if (new_max_value := value_max + offset + overflow) > TR_MAX:
overflow -= 1
continue
if (new_min_value := value_min - offset + overflow) < TR_MIN:
overflow += 1
continue
if ((new_max_value - new_min_value) / 40).is_integer():
split_value = int((value_max + offset - (value_min - offset)) / 4)
break
offset += 1
return split_value, offset + overflow
value_max, value_min = get_value_bounds([i.tr for i in histories])
split_value, offset = get_split(value_max, value_min)
if sprint.record is None:
sprint_value = 'N/A'
else:
if not isinstance(sprint.record, SoloRecord):
raise WhatTheFuckError('40L记录不是单人记录')
duration = timedelta(milliseconds=sprint.record.endcontext.final_time).total_seconds()
sprint_value = f'{duration:.1f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.1f}s' # noqa: PLR2004
if blitz.record is None:
blitz_value = 'N/A'
else:
if not isinstance(blitz.record, SoloRecord):
raise WhatTheFuckError('Blitz记录不是单人记录')
blitz_value = f'{blitz.record.endcontext.score:,}'
async with HostPage(
await render(
'tetrio/info',
TETRIOInfo(
user=TETRIOInfo.User(
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
if user_info.data.user.avatar_revision is not None
else f'{{"type":"identicon","hash":"{md5(user_info.data.user.id.encode()).hexdigest()}"}}', # noqa: S324
name=user_name,
bio=user_info.data.user.bio,
),
ranking=TETRIOInfo.Ranking(
rating=round(league.glicko, 2),
rd=round(league.rd, 2),
),
tetra_league=TETRIOInfo.TetraLeague(
rank=league.rank,
tr=round(league.rating, 2),
global_rank=league.standing,
pps=league.pps,
lpm=round(lpm := (league.pps * 24), 2),
apm=league.apm,
apl=round(league.apm / lpm, 2),
vs=league.vs,
adpm=round(adpm := (league.vs * 0.6), 2),
adpl=round(adpm / lpm, 2),
),
tetra_league_history=TETRIOInfo.TetraLeagueHistory(
data=[TETRIOInfo.TetraLeagueHistory.Data(record_at=time, tr=tr) for time, tr in histories],
split_interval=split_value,
min_tr=value_min,
max_tr=value_max,
offset=offset,
),
radar=TETRIOInfo.Radar(
app=(app := (league.apm / (60 * league.pps))),
dsps=(dsps := ((league.vs / 100) - (league.apm / 60))),
dspp=(dspp := (dsps / league.pps)),
ci=150 * dspp - 125 * app + 50 * (league.vs / league.apm) - 25,
ge=2 * ((app * dsps) / league.pps),
),
sprint=sprint_value,
blitz=blitz_value,
),
)
) as page_hash:
return UniMessage.image(
raw=await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
)
if isinstance(sprint.record, MultiRecord) or isinstance(blitz.record, MultiRecord):
raise WhatTheFuckError('单人游戏记录是多人游戏记录')
try:
return UniMessage.image(raw=await self.make_query_image(self.user, user_info, sprint.record, blitz.record))
except TypeError:
...
# fallback
league = user_info.data.user.league
user_name = user_info.data.user.username.upper()
ret_message = ''
if isinstance(league, NeverPlayedLeague):
ret_message += f'用户 {user_name} 没有排位统计数据'
@@ -329,21 +205,148 @@ class Processor(ProcessorMeta):
if league.vs is not None:
adpm = league.vs * 0.6
ret_message += f'\nADPM: {round(adpm,2)} ( x{round(adpm/lpm,2)} ) ( {league.vs}vs )'
user_records = await self.get_user_records()
sprint = user_records.data.records.sprint
if sprint.record is not None:
if not isinstance(sprint.record, SoloRecord):
raise WhatTheFuckError('40L记录不是单人记录')
ret_message += f'\n40L: {round(sprint.record.endcontext.final_time/1000,2)}s'
ret_message += f' ( #{sprint.rank} )' if sprint.rank is not None else ''
blitz = user_records.data.records.blitz
if blitz.record is not None:
if not isinstance(blitz.record, SoloRecord):
raise WhatTheFuckError('Blitz记录不是单人记录')
ret_message += f'\nBlitz: {blitz.record.endcontext.score}'
ret_message += f' ( #{blitz.rank} )' if blitz.rank is not None else ''
return UniMessage(ret_message)
@staticmethod
async def query_historical_data(user: User, user_info: InfoSuccess) -> list[TETRIOInfo.TetraLeagueHistory.Data]:
today = datetime.now(ZoneInfo('Asia/Shanghai')).replace(hour=0, minute=0, second=0, microsecond=0)
forward = timedelta(days=9)
start_time = (today - forward).astimezone(UTC)
async with get_session() as session:
historical_data = (
await session.scalars(
select(HistoricalData)
.where(HistoricalData.trigger_time >= start_time)
.where(HistoricalData.game_platform == GAME_TYPE)
.where(HistoricalData.user_unique_identifier == user.unique_identifier)
)
).all()
if historical_data:
extra = (
await session.scalars(
select(HistoricalData)
.where(HistoricalData.game_platform == GAME_TYPE)
.where(HistoricalData.user_unique_identifier == user.unique_identifier)
.order_by(HistoricalData.id.desc())
.where(HistoricalData.id < min([i.id for i in historical_data]))
.limit(1)
)
).one_or_none()
if extra is not None:
historical_data = list(historical_data)
historical_data.append(extra)
histories = [
TETRIOInfo.TetraLeagueHistory.Data(
record_at=i.processed_data.user_info.cache.cached_at.astimezone(ZoneInfo('Asia/Shanghai')),
tr=i.processed_data.user_info.data.user.league.rating,
)
for i in historical_data
if isinstance(i.processed_data, ProcessedData)
and i.processed_data.user_info is not None
and isinstance(i.processed_data.user_info.data.user.league, RatedLeague)
]
# 按照时间排序
histories = sorted(histories, key=lambda x: x.record_at)
for index, value in enumerate(histories):
# 在历史记录里找有没有今天0点后的数据
if value.record_at > today:
histories = histories[:index] + [
get_specified_point(histories[index - 1], histories[index], today.replace(microsecond=1000))
]
break
else:
histories.append(
get_specified_point(
histories[-1],
TETRIOInfo.TetraLeagueHistory.Data(
record_at=user_info.cache.cached_at, tr=user_info.data.user.league.rating
),
today.replace(microsecond=1000),
)
)
if histories[0].record_at < (today - forward):
histories[0] = get_specified_point(
histories[0],
histories[1],
today - forward,
)
else:
histories.insert(0, TETRIOInfo.TetraLeagueHistory.Data(record_at=today - forward, tr=histories[0].tr))
return histories
@staticmethod
async def make_query_image(
user: User, user_info: InfoSuccess, sprint: SoloRecord | None, blitz: SoloRecord | None
) -> bytes:
league = user_info.data.user.league
if not isinstance(league, RatedLeague) or league.vs is None:
raise TypeError
user_name = user_info.data.user.username.upper()
histories = await Processor.query_historical_data(user, user_info)
value_max, value_min = get_value_bounds([i.tr for i in histories])
split_value, offset = get_split(value_max, value_min)
if sprint is not None:
duration = timedelta(milliseconds=sprint.endcontext.final_time).total_seconds()
sprint_value = f'{duration:.1f}s' if duration < 60 else f'{duration // 60:.0f}m {duration % 60:.1f}s' # noqa: PLR2004
else:
sprint_value = 'N/A'
blitz_value = f'{blitz.endcontext.score:,}' if blitz is not None else 'N/A'
async with HostPage(
await render(
'tetrio/info',
TETRIOInfo(
user=TETRIOInfo.User(
avatar=f'https://tetr.io/user-content/avatars/{user_info.data.user.id}.jpg?rv={user_info.data.user.avatar_revision}'
if user_info.data.user.avatar_revision is not None
else f'{{"type":"identicon","hash":"{md5(user_info.data.user.id.encode()).hexdigest()}"}}', # noqa: S324
name=user_name,
bio=user_info.data.user.bio,
),
ranking=TETRIOInfo.Ranking(
rating=round(league.glicko, 2),
rd=round(league.rd, 2),
),
tetra_league=TETRIOInfo.TetraLeague(
rank=league.rank,
tr=round(league.rating, 2),
global_rank=league.standing,
pps=league.pps,
lpm=round(lpm := (league.pps * 24), 2),
apm=league.apm,
apl=round(league.apm / lpm, 2),
vs=league.vs,
adpm=round(adpm := (league.vs * 0.6), 2),
adpl=round(adpm / lpm, 2),
),
tetra_league_history=TETRIOInfo.TetraLeagueHistory(
data=histories,
split_interval=split_value,
min_tr=value_min,
max_tr=value_max,
offset=offset,
),
radar=TETRIOInfo.Radar(
app=(app := (league.apm / (60 * league.pps))),
dsps=(dsps := ((league.vs / 100) - (league.apm / 60))),
dspp=(dspp := (dsps / league.pps)),
ci=150 * dspp - 125 * app + 50 * (league.vs / league.apm) - 25,
ge=2 * ((app * dsps) / league.pps),
),
sprint=sprint_value,
blitz=blitz_value,
),
)
) as page_hash:
return await screenshot(urlunparse(('http', get_self_netloc(), f'/host/{page_hash}.html', '', '', '')))
async def get_user(self) -> None:
"""
用于获取 UserName 和 UserID 的函数