mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
引入pylint对代码进行检查
代码规范:使用PEP8推荐的命名规范 删除Request模块,改为每个游戏的processor单独实现,因为每个游戏的api请求都不太一样 对于io_data_processor: 1. 引入了playwright以应对api套的cloudflare五秒盾 对于top_data_processor: 1. 添加了lxml和pandas的stubs库,并修复了所有type hint错误 对于sql: 1. 使用全局变量保存数据库连接对象,理论上运行一次只会连接一次数据库 2. 移动初始化数据库的hook函数到sql.py 其他: 优化代码 版本推进
This commit is contained in:
@@ -0,0 +1 @@
|
||||
from . import io_data_processor, top_data_processor, tos_data_processor
|
||||
@@ -0,0 +1,262 @@
|
||||
from typing import Any
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
from playwright.async_api import Browser, async_playwright
|
||||
from ujson import loads, JSONDecodeError
|
||||
import aiohttp
|
||||
|
||||
from nonebot import on_regex, get_driver
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.log import logger
|
||||
|
||||
from ..utils.message_analyzer import handle_bind_message, handle_stats_query_message
|
||||
from ..utils.sql import query_bind_info, write_bind_info
|
||||
|
||||
_BROWSER: Browser | None = None
|
||||
|
||||
|
||||
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
|
||||
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@IOBind.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_bind_message(message=event.raw_message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
if decoded_message[0] == 'ID':
|
||||
user_id_stats = await check_user_id(user_id=decoded_message[1][1])
|
||||
if user_id_stats[0] is False:
|
||||
await matcher.finish(user_id_stats[1])
|
||||
else:
|
||||
user_id = decoded_message[1][1]
|
||||
elif decoded_message[0] == 'Name':
|
||||
user_data = await get_user_data(user_name=decoded_message[1][1])
|
||||
if user_data[0] is False:
|
||||
await matcher.finish('用户信息请求失败')
|
||||
elif user_data[1] is False:
|
||||
await matcher.finish(f'用户信息请求错误:\n{user_data[2]["error"]}')
|
||||
else:
|
||||
user_id = await get_user_id(user_data=user_data[2])
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(await write_bind_info(qq_number=event.sender.user_id,
|
||||
user=user_id,
|
||||
game_type='IO'))
|
||||
|
||||
|
||||
@IOStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'AT':
|
||||
if event.is_tome() is True:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
|
||||
elif decoded_message[0] == 'ME':
|
||||
if event.sender.user_id is None:
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败, 请联系bot主人')
|
||||
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='IO')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
|
||||
elif decoded_message[0] == 'ID':
|
||||
message = await generate_message(user_id=decoded_message[1][1])
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(user_name=decoded_message[1][1])
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
if isinstance(_BROWSER, Browser):
|
||||
await _BROWSER.close()
|
||||
|
||||
|
||||
async def init_playwright() -> Browser:
|
||||
'''初始化playwright'''
|
||||
global _BROWSER
|
||||
p = await async_playwright().start()
|
||||
_BROWSER = await p.firefox.launch()
|
||||
return _BROWSER
|
||||
|
||||
|
||||
async def get_browser() -> Browser:
|
||||
'''获取浏览器对象'''
|
||||
return _BROWSER or await init_playwright()
|
||||
|
||||
|
||||
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''请求api'''
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as resp:
|
||||
data = await resp.json()
|
||||
return (True, data['success'], data)
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return (False, False, {})
|
||||
except aiohttp.client_exceptions.ContentTypeError:
|
||||
# 如果有五秒盾就用firefox硬穿
|
||||
browser = await get_browser()
|
||||
page = await browser.new_page()
|
||||
await page.goto(url)
|
||||
attempts = 0
|
||||
while True:
|
||||
text = await page.locator("body").text_content()
|
||||
if text is None:
|
||||
continue
|
||||
attempts += 1
|
||||
if await page.title() == 'Please Wait... | Cloudflare':
|
||||
break
|
||||
try:
|
||||
data = loads(text)
|
||||
except JSONDecodeError:
|
||||
await page.wait_for_timeout(1000)
|
||||
else:
|
||||
return (True, data['success'], data)
|
||||
if attempts >= 60:
|
||||
break
|
||||
await page.close()
|
||||
return (True, False, {'error': '绕过五秒盾失败'})
|
||||
|
||||
|
||||
async def get_user_data(user_name: str = None,
|
||||
user_id: str = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_name}'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_id}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_data_url)
|
||||
|
||||
|
||||
async def get_solo_data(user_name: str = None,
|
||||
user_id: str = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取Solo数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_solo_url)
|
||||
|
||||
|
||||
async def get_user_id(user_data: dict) -> str:
|
||||
'''获取用户ID'''
|
||||
return user_data['data']['user']['_id']
|
||||
|
||||
|
||||
async def check_user_id(user_id: str) -> tuple[bool, str]:
|
||||
'''检查用户ID是否有效'''
|
||||
user_data = await get_user_data(user_id=user_id)
|
||||
if user_data[0] is False:
|
||||
return (False, '用户信息请求失败')
|
||||
if user_data[1] is False:
|
||||
return (False, f'用户信息请求错误:\n{user_data[2]["error"]}')
|
||||
if user_id == user_data[2]['data']['user']['_id']:
|
||||
return (True, '')
|
||||
raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x')
|
||||
|
||||
|
||||
async def get_league_stats(user_data: dict) -> dict[str, Any]:
|
||||
'''获取排位统计数据'''
|
||||
league = user_data['data']['user']['league']
|
||||
league_stats: dict[str, Any] = {}
|
||||
if league['gamesplayed'] != 0:
|
||||
league_stats['PPS'] = league['pps']
|
||||
league_stats['APM'] = league['apm']
|
||||
league_stats['VS'] = 0 if league['vs'] is None else league['vs']
|
||||
league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper()
|
||||
if league['rating'] == -1:
|
||||
league_stats['Rank'] = None
|
||||
else:
|
||||
league_stats['Rating'] = round(league['rating'], 2)
|
||||
league_stats['Glicko'] = round(league['glicko'], 2)
|
||||
league_stats['RD'] = round(league['rd'], 2)
|
||||
league_stats['Standing'] = league['standing']
|
||||
league_stats['LPM'] = round((league['pps'] * 24), 2)
|
||||
league_stats['APL'] = round(
|
||||
(league_stats['APM'] / league_stats['LPM']), 2)
|
||||
league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2)
|
||||
league_stats['ADPL'] = round(
|
||||
(league_stats['ADPM'] / league_stats['LPM']), 2)
|
||||
return league_stats
|
||||
|
||||
|
||||
async def get_sprint_stats(solo_data: dict) -> dict[str, Any]:
|
||||
'''获取40L统计数据'''
|
||||
sprint_stats = {}
|
||||
solo = solo_data['data']['records']['40l']
|
||||
if solo['record'] is not None:
|
||||
sprint_stats['Time'] = round(
|
||||
solo['record']['endcontext']['finalTime'] / 1000, 2)
|
||||
if solo['rank'] is not None:
|
||||
sprint_stats['Rank'] = solo['rank']
|
||||
return sprint_stats
|
||||
|
||||
|
||||
async def get_blitz_stats(solo_data: dict) -> dict[str, Any]:
|
||||
'''获取Blitz统计数据'''
|
||||
blitz_stats = {}
|
||||
blitz = solo_data['data']['records']['blitz']
|
||||
if blitz['record'] is not None:
|
||||
blitz_stats['Score'] = blitz['record']['endcontext']['score']
|
||||
if blitz['rank'] is not None:
|
||||
blitz_stats['Rank'] = blitz['rank']
|
||||
return blitz_stats
|
||||
|
||||
|
||||
async def generate_message(user_name: str = None, user_id: str = None) -> str:
|
||||
'''生成消息'''
|
||||
user_data, solo_data = await gather(get_user_data(user_name=user_name, user_id=user_id),
|
||||
get_solo_data(user_name=user_name, user_id=user_id))
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
user_name = user_data[2]['data']['user']['username'].upper()
|
||||
league_stats = await get_league_stats(user_data[2])
|
||||
message = ''
|
||||
if not league_stats:
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
else:
|
||||
if league_stats['Rank'] is None:
|
||||
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
|
||||
else:
|
||||
if league_stats['Rank'] == 'Z':
|
||||
message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR'
|
||||
else:
|
||||
message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})'
|
||||
message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:'
|
||||
message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )'
|
||||
message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )'
|
||||
if league_stats["VS"] != 0:
|
||||
message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )'
|
||||
if solo_data[0] is False:
|
||||
return f'{message}\nSolo统计数据请求失败'
|
||||
if solo_data[1] is False:
|
||||
return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}'
|
||||
sprint_stats, blitz_stats = await gather(get_sprint_stats(solo_data[2]),
|
||||
get_blitz_stats(solo_data[2]))
|
||||
message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else ''
|
||||
message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''
|
||||
message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else ''
|
||||
message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else ''
|
||||
return message
|
||||
@@ -0,0 +1,182 @@
|
||||
from typing import Any
|
||||
from re import I
|
||||
from lxml import etree
|
||||
from pandas import read_html
|
||||
import aiohttp
|
||||
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.log import logger
|
||||
|
||||
from ..utils.message_analyzer import handle_bind_message, handle_stats_query_message
|
||||
from ..utils.sql import query_bind_info, write_bind_info
|
||||
|
||||
TOPBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
|
||||
TopStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
|
||||
|
||||
|
||||
@TOPBind.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_bind_message(message=event.raw_message, game_type='TOP')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'Name':
|
||||
user_data = await get_user_data(decoded_message[1][1])
|
||||
if user_data[0] is False:
|
||||
await matcher.finish('用户信息请求失败')
|
||||
else:
|
||||
if await check_user(user_data[1]) is False:
|
||||
await matcher.finish('用户不存在')
|
||||
user_name = await get_user_name(user_data[1])
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(await write_bind_info(qq_number=event.sender.user_id, user=user_name, game_type='TOP'))
|
||||
|
||||
|
||||
@TopStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='TOP')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'AT':
|
||||
if event.is_tome() is True:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
|
||||
elif decoded_message[0] == 'ME':
|
||||
if event.sender.user_id is None:
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败, 请联系bot主人')
|
||||
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='TOP')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(decoded_message[1][1])
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
async def get_user_data(user_name: str) -> tuple[bool, str]:
|
||||
'''获取用户信息'''
|
||||
url = f'http://tetrisonline.pl/top/profile.php?user={user_name}'
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as resp:
|
||||
return (True, await resp.text())
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
logger.error(error)
|
||||
return (False, '')
|
||||
|
||||
|
||||
async def check_user(user_data: str) -> bool:
|
||||
'''如果用户存在返回True, 如果用户不存在返回False'''
|
||||
return user_data.find('user not found!') == -1
|
||||
|
||||
|
||||
async def get_user_name(user_data: str) -> str:
|
||||
'''获取用户名'''
|
||||
data = etree.HTML(user_data).xpath('//div[@class="mycontent"]/h1/text()')
|
||||
if isinstance(data, list):
|
||||
return str(data[0]).replace('\'s profile', '')
|
||||
raise TypeError('预期外行为, 请上报GitHub')
|
||||
|
||||
|
||||
async def get_game_stats(user_data: str) -> dict[str, dict[str, Any]]:
|
||||
'''获取游戏统计数据'''
|
||||
game_stats: dict[str, Any] = {'24H': {}, 'All': {}}
|
||||
html = etree.HTML(user_data)
|
||||
data = html.xpath('//div[@class="mycontent"]//text()')
|
||||
if isinstance(data, list):
|
||||
for i in data:
|
||||
if not isinstance(i, str):
|
||||
i = str(i)
|
||||
i = i.strip()
|
||||
if i.startswith('lpm:'):
|
||||
game_stats['24H']['LPM'] = i.replace('lpm:', '').strip()
|
||||
if i.startswith('apm:'):
|
||||
game_stats['24H']['APM'] = i.replace('apm:', '').strip()
|
||||
if 'LPM' in game_stats['24H'] and 'APM' in game_stats['24H']:
|
||||
break
|
||||
else:
|
||||
raise TypeError('预期外行为, 请上报GitHub')
|
||||
# 如果没有24H统计数据
|
||||
if game_stats['24H'].get('LPM') in [None, ''] or game_stats['24H'].get('APM') in [None, '']:
|
||||
game_stats['24H'].pop('LPM', None)
|
||||
game_stats['24H'].pop('APM', None)
|
||||
else:
|
||||
game_stats['24H']['PPS'] = round(
|
||||
float(game_stats['24H']['LPM']) / 24, 2)
|
||||
game_stats['24H']['APL'] = round(
|
||||
float(game_stats['24H']['APM']) / float(game_stats['24H']['LPM']), 2)
|
||||
game_stats['24H']['LPM'] = round(float(game_stats['24H']['LPM']), 2)
|
||||
game_stats['24H']['APM'] = round(float(game_stats['24H']['APM']), 2)
|
||||
table = html.xpath('//table')
|
||||
if isinstance(table, list):
|
||||
if isinstance(table[0], etree._Element):
|
||||
stats_table = etree.tostring(table[0], encoding='utf-8').decode()
|
||||
df = read_html(stats_table, encoding='utf-8', header=0)[0]
|
||||
results = df.T.to_dict().values()
|
||||
if results:
|
||||
game_stats['All']['LPM'] = 0
|
||||
game_stats['All']['APM'] = 0
|
||||
for i in results:
|
||||
if isinstance(i, dict):
|
||||
game_stats['All']['LPM'] += i['lpm']
|
||||
game_stats['All']['APM'] += i['apm']
|
||||
game_stats['All']['LPM'] = game_stats['All']['LPM'] / \
|
||||
len(results)
|
||||
game_stats['All']['APM'] = game_stats['All']['APM'] / \
|
||||
len(results)
|
||||
game_stats['All']['PPS'] = round(
|
||||
game_stats['All']['LPM'] / 24, 2)
|
||||
game_stats['All']['APL'] = round(
|
||||
float(game_stats['All']['APM']) / float(game_stats['All']['LPM']), 2)
|
||||
game_stats['All']['LPM'] = round(
|
||||
float(game_stats['All']['LPM']), 2)
|
||||
game_stats['All']['APM'] = round(
|
||||
float(game_stats['All']['APM']), 2)
|
||||
else:
|
||||
raise TypeError('预期外行为, 请上报GitHub')
|
||||
return game_stats
|
||||
|
||||
|
||||
async def generate_message(user_name: str) -> str:
|
||||
'''生成消息'''
|
||||
user_data = await get_user_data(user_name)
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if await check_user(user_data[1]) is False:
|
||||
return '用户不存在'
|
||||
user_name = await get_user_name(user_data[1])
|
||||
game_stats = await get_game_stats(user_data[1])
|
||||
message = ''
|
||||
if game_stats['24H'] and game_stats['All']:
|
||||
message += f'用户 {user_name} 24小时内统计数据为: '
|
||||
message += f'\nL\'PM: {game_stats["24H"]["LPM"]} ( {game_stats["24H"]["PPS"]} pps )'
|
||||
message += f'\nAPM: {game_stats["24H"]["APM"]} ( x{game_stats["24H"]["APL"]} )'
|
||||
message += '\n历史统计数据为: '
|
||||
message += f'\nL\'PM: {game_stats["All"]["LPM"]} ( {game_stats["All"]["PPS"]} pps )'
|
||||
message += f'\nAPM: {game_stats["All"]["APM"]} ( x{game_stats["All"]["APL"]} )'
|
||||
elif game_stats['24H'] and not game_stats['All']:
|
||||
message += f'用户 {user_name} 24小时内统计数据为: '
|
||||
message += f'\nL\'PM: {game_stats["24H"]["LPM"]} ( {game_stats["24H"]["PPS"]} pps )'
|
||||
message += f'\nAPM: {game_stats["24H"]["APM"]} ( x{game_stats["24H"]["APL"]} )'
|
||||
message += '\n暂无历史统计数据'
|
||||
message += '\n( 这理论上不该存在, 如果你看到了, 请联系bot主人查看后台'
|
||||
logger.error(f'老实说这个不算Error, 但是理论上不应该有, 如果你看到了这条日志, 我希望你能来Github发个issue(\
|
||||
user_name: {user_name}\
|
||||
user_data: {user_data}\
|
||||
game_stats: {game_stats}')
|
||||
elif not game_stats['24H'] and game_stats['All']:
|
||||
message += f'用户 {user_name} 暂无24小时内统计数据, 历史统计数据为: '
|
||||
message += f'\nL\'PM: {game_stats["All"]["LPM"]} ( {game_stats["All"]["PPS"]} pps )'
|
||||
message += f'\nAPM: {game_stats["All"]["APM"]} ( x{game_stats["All"]["APL"]} )'
|
||||
else:
|
||||
message += f'用户 {user_name} 暂无24小时内统计数据, 暂无历史统计数据'
|
||||
return message
|
||||
@@ -0,0 +1,164 @@
|
||||
from typing import Any
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
import aiohttp
|
||||
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.log import logger
|
||||
|
||||
from ..utils.message_analyzer import handle_stats_query_message
|
||||
|
||||
TOSStats = on_regex(pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
|
||||
flags=I, permission=GROUP)
|
||||
|
||||
|
||||
@TOSStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='TOS')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'AT' or decoded_message[0] == 'QQ':
|
||||
if decoded_message[1][1] == event.self_id:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
message = await generate_message(tea_id=decoded_message[1][1])
|
||||
elif decoded_message[0] == 'ME':
|
||||
message = await generate_message(tea_id=event.sender.user_id)
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(user_name=decoded_message[1][1])
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''请求api'''
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as resp:
|
||||
data = await resp.json()
|
||||
return (True, data['success'], data)
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return (False, False, {})
|
||||
|
||||
|
||||
async def get_user_info(user_name: str = None,
|
||||
tea_id: int = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户信息'''
|
||||
if user_name is not None and tea_id is None:
|
||||
user_data_url = f'https://teatube.cn:8888/getUsernameInfo?username={user_name}'
|
||||
elif user_name is None and tea_id is not None:
|
||||
user_data_url = f'https://teatube.cn:8888/getTeaIdInfo?teaId={tea_id}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_data_url)
|
||||
|
||||
|
||||
async def get_user_data(user_name: str = None,
|
||||
tea_id: int = None,
|
||||
other_parameter: str = ''
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
if user_name is not None and tea_id is None:
|
||||
user_data_url = f'https://teatube.cn:8888/getProfile?id={user_name}{other_parameter}'
|
||||
elif user_name is None and tea_id is not None:
|
||||
user_data_url = f'https://teatube.cn:8888/getProfile?id={tea_id}{other_parameter}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_data_url)
|
||||
|
||||
|
||||
async def get_rank_stats(user_info: dict) -> dict[str, float]:
|
||||
'''获取Rank数据'''
|
||||
data = user_info['data']
|
||||
if int(data['rankedGames']) != 0:
|
||||
rank_stats = {}
|
||||
rank_stats['Rating'] = round(float(data['ratingNow']), 2)
|
||||
rank_stats['RD'] = round(float(data['rdNow']), 2)
|
||||
rank_stats['Vol'] = round(float(data['volNow']), 3)
|
||||
return rank_stats
|
||||
|
||||
|
||||
async def get_game_data(user_data: dict) -> dict[str, int | float]:
|
||||
'''获取游戏数据'''
|
||||
if user_data['data'] != []:
|
||||
game_data: dict[str, int | float] = {}
|
||||
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = num = 0
|
||||
for i in user_data['data']:
|
||||
# 排除单人局和时间为0的游戏
|
||||
if i['num_players'] == 1 or i['time'] == 0:
|
||||
continue
|
||||
# 茶:不计算没挖掘的局, 即使apm和lpm也如此
|
||||
if i['dig'] is None:
|
||||
continue
|
||||
# 加权计算
|
||||
time = i['time'] / 1000
|
||||
lpm = 24 * (i['pieces'] / time)
|
||||
apm = (i['attack'] / time) * 60
|
||||
adpm = ((i['attack'] + i['dig']) / time) * 60
|
||||
weighted_total_lpm += lpm * time
|
||||
weighted_total_apm += apm * time
|
||||
weighted_total_adpm += adpm * time
|
||||
total_time += time
|
||||
num += 1
|
||||
if num == 50:
|
||||
break
|
||||
if num > 0:
|
||||
game_data['NUM'] = num
|
||||
game_data['LPM'] = round((weighted_total_lpm / total_time), 2)
|
||||
game_data['APM'] = round((weighted_total_apm / total_time), 2)
|
||||
game_data['ADPM'] = round((weighted_total_adpm / total_time), 2)
|
||||
game_data['PPS'] = round((game_data['LPM'] / 24), 2)
|
||||
game_data['APL'] = round((game_data['APM'] / game_data['LPM']), 2)
|
||||
game_data['ADPL'] = round(
|
||||
(game_data['ADPM'] / game_data['LPM']), 2)
|
||||
game_data['VS'] = round((game_data['ADPM'] / 60 * 100), 2)
|
||||
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息
|
||||
return game_data
|
||||
|
||||
|
||||
async def get_pb_data(user_info: dict) -> dict[str, float | str]:
|
||||
'''获取PB数据'''
|
||||
pb_data: dict[str, float | str] = {}
|
||||
data = user_info['data']
|
||||
if int(data['PBSprint']) != 2147483647:
|
||||
pb_data['Sprint'] = round(float(data['PBSprint']) / 1000, 2)
|
||||
if int(data['PBMarathon']) != 0:
|
||||
pb_data['Marathon'] = data['PBMarathon']
|
||||
if int(data['PBChallenge']) != 0:
|
||||
pb_data['Challenge'] = data['PBChallenge']
|
||||
return pb_data
|
||||
|
||||
|
||||
async def generate_message(user_name: str = None, tea_id: int = None) -> str:
|
||||
'''生成消息'''
|
||||
user_info, user_data = await gather(get_user_info(user_name=user_name, tea_id=tea_id),
|
||||
get_user_data(user_name=user_name, tea_id=tea_id))
|
||||
if user_info[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_info[1] is False:
|
||||
return f'用户信息请求错误:\n{user_info[2]["error"]}'
|
||||
rank_stats, pb_data = await gather(get_rank_stats(user_info[2]), get_pb_data(user_info[2]))
|
||||
message = f'用户 {user_info[2]["data"]["name"]} ({user_info[2]["data"]["teaId"]}) '
|
||||
if not rank_stats:
|
||||
message += '暂无段位统计数据'
|
||||
else:
|
||||
message += f', 段位分 {rank_stats["Rating"]}±{rank_stats["RD"]} ({rank_stats["Vol"]}) '
|
||||
if user_data[0] is False:
|
||||
message = f'{message.rstrip()}\n游戏数据请求失败'
|
||||
elif user_data[1] is False:
|
||||
message = f'{message.rstrip()}\n游戏数据请求错误:\n{user_data[2]["error"]}'
|
||||
else:
|
||||
game_data = await get_game_data(user_data[2])
|
||||
if not game_data:
|
||||
message += ', 暂无游戏数据'
|
||||
else:
|
||||
message += f', 最近 {game_data["NUM"]} 局数据'
|
||||
message += f'\nL\'PM: {game_data["LPM"]} ( {game_data["PPS"]} pps )'
|
||||
message += f'\nAPM:{game_data["APM"]} ( x{game_data["APL"]} )'
|
||||
message += f'\nADPM:{game_data["ADPM"]} ( x{game_data["ADPL"]} ) ( {game_data["VS"]}vs )'
|
||||
message += f'\n40L: {pb_data["Sprint"]}s' if 'Sprint' in pb_data else ''
|
||||
message += f'\nMarathon: {pb_data["Marathon"]}' if 'Marathon' in pb_data else ''
|
||||
message += f'\nChallenge: {pb_data["Challenge"]}' if 'Challenge' in pb_data else ''
|
||||
return message
|
||||
Reference in New Issue
Block a user