mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
149 lines
6.9 KiB
Python
149 lines
6.9 KiB
Python
from nonebot import on_regex
|
||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||
from nonebot.matcher import Matcher
|
||
from nonebot.log import logger
|
||
|
||
from typing import Any
|
||
from re import I
|
||
from lxml import etree
|
||
from pandas import read_html
|
||
import aiohttp
|
||
|
||
from ..Utils.MessageAnalyzer import handleBindMessage, handleStatsQueryMessage
|
||
from ..Utils.SQL import queryBindInfo, writeBindInfo
|
||
|
||
topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
|
||
topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
|
||
|
||
|
||
@topBind.handle()
|
||
async def _(event: MessageEvent, matcher: Matcher):
|
||
decodedMessage = await handleBindMessage(message=event.raw_message, gameType='TOP')
|
||
if decodedMessage[0] is None:
|
||
await matcher.finish(decodedMessage[1][0])
|
||
elif decodedMessage[0] == 'Name':
|
||
userData = await getUserData(decodedMessage[1][1])
|
||
if userData[0] is False:
|
||
await matcher.finish('用户信息请求失败')
|
||
else:
|
||
if await checkUser(userData[1]) is False:
|
||
matcher.finish('用户不存在')
|
||
userName = await getUserName(userData[1])
|
||
if event.sender.user_id is None: # 理论上是不会有None出现的,ide快乐行属于是(
|
||
logger.error('获取QQ号失败')
|
||
await matcher.finish('获取QQ号失败')
|
||
await matcher.finish(await writeBindInfo(QQNumber=event.sender.user_id, user=userName, gameType='TOP'))
|
||
|
||
|
||
@topStats.handle()
|
||
async def _(event: MessageEvent, matcher: Matcher):
|
||
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOP')
|
||
if decodedMessage[0] is None:
|
||
await matcher.finish(decodedMessage[1][0])
|
||
elif decodedMessage[0] == 'AT':
|
||
if event.is_tome() is True:
|
||
await matcher.finish(message='不能查询bot的信息')
|
||
bindInfo = await queryBindInfo(QQNumber=decodedMessage[1][1], gameType='TOP')
|
||
if bindInfo is None:
|
||
message = '未查询到绑定信息'
|
||
else:
|
||
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(bindInfo)}')
|
||
elif decodedMessage[0] == 'ME':
|
||
if event.sender.user_id is None:
|
||
logger.error('获取QQ号失败')
|
||
await matcher.finish('获取QQ号失败,请联系bot主人')
|
||
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='TOP')
|
||
if bindInfo is None:
|
||
message = '未查询到绑定信息'
|
||
else:
|
||
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(bindInfo)}')
|
||
elif decodedMessage[0] == 'Name':
|
||
message = await generateMessage(decodedMessage[1][1])
|
||
await matcher.finish(message=message)
|
||
|
||
|
||
async def getUserData(userName: str) -> tuple[bool, str]:
|
||
Url = f'http://tetrisonline.pl/top/profile.php?user={userName}'
|
||
# 因为top查数据没有api 所以不得不再写一次请求(
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(Url) as resp:
|
||
return (True, await resp.text())
|
||
except aiohttp.client_exceptions.ClientConnectorError as e:
|
||
logger.error(e)
|
||
return (False, '')
|
||
|
||
|
||
async def checkUser(userData: str) -> bool:
|
||
return True if userData.find('user not found!') else False
|
||
|
||
|
||
async def getUserName(userData: str) -> str:
|
||
return etree.HTML(userData).xpath('//div[@class="mycontent"]/h1/text()')[0].replace('\'s profile', '')
|
||
|
||
|
||
async def getGameStats(userData: str) -> dict[str, Any]:
|
||
gameStats = {}
|
||
html = etree.HTML(userData)
|
||
for i in html.xpath('//div[@class="mycontent"]//text()'):
|
||
i = i.strip()
|
||
if i.startswith('lpm:'):
|
||
gameStats['24HStats'] = True
|
||
gameStats['24HLPM'] = i.replace('lpm:', '').strip()
|
||
elif i.startswith('apm:'):
|
||
gameStats['24HStats'] = True
|
||
gameStats['24HAPM'] = i.replace('apm:', '').strip()
|
||
if '24HLPM' in gameStats and '24HAPM' in gameStats:
|
||
break
|
||
# 如果没有24H统计数据
|
||
if gameStats.get('24HLPM') in [None, ''] or gameStats.get('24HAPM') in [None, '']:
|
||
gameStats.pop('24HLPM')
|
||
gameStats.pop('24HAPM')
|
||
gameStats['24HStats'] = False
|
||
else:
|
||
gameStats['24HPPS'] = round(float(gameStats['24HLPM']) / 24, 2)
|
||
gameStats['24HAPL'] = round(
|
||
float(gameStats['24HAPM']) / float(gameStats['24HLPM']), 2)
|
||
gameStats['24HLPM'] = round(float(gameStats['24HLPM']), 2)
|
||
gameStats['24HAPM'] = round(float(gameStats['24HAPM']), 2)
|
||
statsTable = html.xpath('//table')
|
||
statsTable = etree.tostring(statsTable[0], encoding='utf-8').decode()
|
||
df = read_html(statsTable, encoding='utf-8', header=0)[0]
|
||
results = list(df.T.to_dict().values())
|
||
if results != []:
|
||
gameStats['AllStats'] = True
|
||
gameStats['AllLPM'] = 0
|
||
gameStats['AllAPM'] = 0
|
||
for i in results:
|
||
gameStats['AllLPM'] += i['lpm']
|
||
gameStats['AllAPM'] += i['apm']
|
||
gameStats['AllLPM'] = gameStats['AllLPM'] / len(results)
|
||
gameStats['AllAPM'] = gameStats['AllAPM'] / len(results)
|
||
gameStats['AllPPS'] = round(gameStats['AllLPM'] / 24, 2)
|
||
gameStats['AllAPL'] = round(
|
||
float(gameStats['AllAPM']) / float(gameStats['AllLPM']), 2)
|
||
gameStats['AllLPM'] = round(float(gameStats['AllLPM']), 2)
|
||
gameStats['AllAPM'] = round(float(gameStats['AllAPM']), 2)
|
||
else:
|
||
gameStats['AllStats'] = False
|
||
return gameStats
|
||
|
||
|
||
async def generateMessage(userName: str) -> str:
|
||
userData = await getUserData(userName)
|
||
if userData[0] is False:
|
||
return '用户信息请求失败'
|
||
if await checkUser(userData[1]) is False:
|
||
return '用户不存在'
|
||
userName = await getUserName(userData[1])
|
||
gameStats = await getGameStats(userData[1])
|
||
if gameStats['24HStats'] is False and gameStats['AllStats'] is False:
|
||
message = f'用户 {userName} 暂无24小时内统计数据, 暂无历史统计数据'
|
||
elif gameStats['24HStats'] is True and gameStats['AllStats'] is False:
|
||
message = f'用户 {userName} 24小时内统计数据为: \nL\'PM: {gameStats["24HLPM"]} ( {gameStats["24HPPS"]} pps )\nAPM: {gameStats["24HAPM"]} ( x{gameStats["24HAPL"]} )\n暂无历史统计数据\n(这真的存在吗'
|
||
elif gameStats['24HStats'] is False and gameStats['AllStats'] is True:
|
||
message = f'用户 {userName} 暂无24小时内统计数据, 历史统计数据为: \nL\'PM: {gameStats["AllLPM"]} ( {gameStats["AllPPS"]} pps )\nAPM: {gameStats["AllAPM"]} ( x{gameStats["AllAPL"]} )'
|
||
else:
|
||
message = f'用户 {userName} 24小时内统计数据为: \nL\'PM: {gameStats["24HLPM"]} ( {gameStats["24HPPS"]} pps )\nAPM: {gameStats["24HAPM"]} ( x{gameStats["24HAPL"]} )\n历史统计数据为: \nL\'PM: {gameStats["AllLPM"]} ( {gameStats["AllPPS"]} pps )\nAPM: {gameStats["AllAPM"]} ( x{gameStats["AllAPL"]} )'
|
||
return message
|