修复所有type hint(当时写的时候mypy炸了

使用神秘新架构
独立request模块
版本推进
fixed #1 [BUG] 茶服无法查询用户名内带有大写字母的用户
This commit is contained in:
2022-07-24 15:15:39 +08:00
parent 0d2b37e78a
commit f1291a9923
12 changed files with 393 additions and 392 deletions

3
.gitignore vendored
View File

@@ -1 +1,4 @@
dist dist
test*
Untitled*
*copy*

View File

@@ -0,0 +1,211 @@
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from typing import Any, Mapping
from asyncio import gather
from re import I
from ..Utils.Request import request
from ..Utils.MessageAnalyzer import handleBindMessage, handleStatsQueryMessage
from ..Utils.SQL import queryBindInfo, writeBindInfo
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
@ioBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleBindMessage(message=event.raw_message, gameType='IO')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
if decodedMessage[0] == 'ID':
userIDStats = await checkUserID(userID=decodedMessage[1][1])
if userIDStats[0] is False:
await matcher.finish(userIDStats[1])
else:
userID = decodedMessage[1][1]
elif decodedMessage[0] == 'Name':
userData = await getUserData(userName=decodedMessage[1][1])
if userData[0] is False:
await matcher.finish('用户信息请求失败')
elif userData[1] is False:
await matcher.finish(f'用户信息请求错误:\n{userData[2]["error"]}')
else:
userID = await getUserID(userData=userData[2])
if event.sender.user_id is None: # 理论上是不会有None出现的ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(await writeBindInfo(QQNumber=event.sender.user_id, user=userID, gameType='IO'))
@ioStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='IO')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
elif decodedMessage[0] == 'AT':
if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息')
bindInfo = await queryBindInfo(QQNumber=decodedMessage[1][1], gameType='IO')
if bindInfo is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
elif decodedMessage[0] == 'ME':
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败请联系bot主人')
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
if bindInfo is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
elif decodedMessage[0] == 'ID':
message = await generateMessage(userID=decodedMessage[1][1])
elif decodedMessage[0] == 'Name':
message = await generateMessage(userName=decodedMessage[1][1])
await matcher.finish(message=message)
async def getUserData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取用户数据
if userName is not None and userID is None:
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
elif userName is None and userID is not None:
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.getUserData: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getSoloData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取Solo数据
if userName is not None and userID is None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
elif userName is None and userID is not None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.getSoloData: 预期外行为请上报GitHub')
return await request(Url=userSoloUrl)
async def getUserID(userData: dict) -> str:
return userData['data']['user']['_id']
async def checkUserID(userID: str) -> tuple[bool, str]:
userData = await getUserData(userID=userID)
if userData[0] is False:
return (False, '用户信息请求失败')
elif userData[1] is False:
return (False, f'用户信息请求错误:\n{userData[2]["error"]}')
elif userID == userData[2]['data']['user']['_id']:
return (True, '')
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.checkUserID: 服务器返回的userID和用户提供的不一致这种情况理论上不应该发生以防万一还是写一下x')
async def getLeagueStats(userData: dict) -> dict[str, Any]:
# 获取排位统计数据
league = userData['data']['user']['league']
leagueStats: dict[str, Any] = {}
if league['gamesplayed'] == 0:
leagueStats['Played'] = False
else:
leagueStats['Played'] = True
leagueStats['PPS'] = league['pps']
leagueStats['APM'] = league['apm']
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] != -1:
leagueStats['Ranked'] = True
leagueStats['Rating'] = round(league['rating'], 2)
leagueStats['Glicko'] = round(league['glicko'], 2)
leagueStats['RD'] = round(league['rd'], 2)
else:
leagueStats['Ranked'] = False
leagueStats['Standing'] = league['standing']
leagueStats['LPM'] = round((league['pps'] * 24), 2)
leagueStats['APL'] = round(
(leagueStats['APM'] / leagueStats['LPM']), 2)
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
leagueStats['ADPL'] = round(
(leagueStats['ADPM'] / leagueStats['LPM']), 2)
return leagueStats
async def getSprintStats(soloData: dict) -> Mapping[str, bool | int | float]:
# 获取40L统计数据
sprintStats = {}
if soloData['data']['records']['40l']['record'] is None:
sprintStats['Played'] = False
else:
sprintStats['Played'] = True
if soloData['data']['records']['40l']['rank'] is None:
sprintStats['Rank'] = False
else:
sprintStats['Rank'] = soloData['data']['records']['40l']['rank']
sprintStats['Time'] = round(
soloData['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
return sprintStats
async def getBlitzStats(soloData: dict) -> dict[str, Any]:
# 获取Blitz统计数据
blitzStats = {}
if soloData['data']['records']['blitz']['record'] is None:
blitzStats['Played'] = False
else:
blitzStats['Played'] = True
if soloData['data']['records']['blitz']['rank'] is None:
blitzStats['Rank'] = False
else:
blitzStats['Rank'] = soloData['data']['records']['blitz']['rank']
blitzStats['Score'] = soloData['data']['records']['blitz']['record']['endcontext']['score']
return blitzStats
async def generateMessage(userName: str = None, userID: str = None) -> str:
# 生成消息
userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID))
if userData[0] is False:
return '用户信息请求失败'
elif userData[1] is False:
return f'用户信息请求错误:\n{userData[2]["error"]}'
userName = userData[2]['data']['user']['username'].upper()
leagueStats = await getLeagueStats(userData[2])
message = ''
if leagueStats['Played'] is False:
message += f'用户 {userName} 没有排位统计数据'
else:
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
message += f'用户 {userName} 暂未完成定级赛'
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
else:
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
if leagueStats["VS"] != 0:
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
if soloData[0] is False:
return f'{message}\nSolo统计数据请求失败'
elif soloData[1] is False:
return f'{message}\nSolo统计数据请求错误:\n{soloData[2]["error"]}'
sprintStats, blitzStats = await gather(getSprintStats(soloData[2]), getBlitzStats(soloData[2]))
if sprintStats['Played'] is True:
message += f'\n40L: {sprintStats["Time"]}s'
if sprintStats['Rank'] is not False:
message += f' ( #{sprintStats["Rank"]} )'
if blitzStats['Played'] is True:
message += f'\nBlitz: {blitzStats["Score"]}'
if blitzStats['Rank'] is not False:
message += f' ( #{blitzStats["Rank"]} )'
return message

View File

@@ -1,74 +1,83 @@
from nonebot.log import logger from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from typing import Any
from asyncio import gather from asyncio import gather
import aiohttp from re import I
from ..Utils.Request import request
from ..Utils.MessageAnalyzer import *
from ..Utils.SQL import *
tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
async def request(Url: str) -> dict[str, bool | dict[str, any]]: @tosStats.handle()
# 封装请求函数 async def _(event: MessageEvent, matcher: Matcher):
data = {} decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOS')
try: if decodedMessage[0] is None:
async with aiohttp.ClientSession() as session: await matcher.finish(decodedMessage[1][0])
async with session.get(Url) as resp: elif decodedMessage[0] == 'AT' or decodedMessage[0] == 'QQ':
data['Status'] = True if decodedMessage[1][1] == event.self_id:
data['Data'] = await resp.json() await matcher.finish(message='不能查询bot的信息')
data['Success'] = data['Data']['success'] message = await generateMessage(teaID=decodedMessage[1][1])
except aiohttp.client_exceptions.ClientConnectorError as e: elif decodedMessage[0] == 'ME':
logger.error(f'[TETRIS STATS] TOSDataProcessing.request: 请求错误\n{e}') message = await generateMessage(teaID=event.sender.user_id)
data['Status'] = False elif decodedMessage[0] == 'Name':
finally: message = await generateMessage(userName=decodedMessage[1][1])
return data await matcher.finish(message=message)
async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool | dict[str, any]]: async def getUserInfo(userName: str = None, teaID: int = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取用户信息 # 获取用户信息
if userName is not None and teaID is None: if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}' userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
userInfo = await request(Url=userDataUrl) elif userName is None and teaID is not None:
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}' userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
userInfo = await request(Url=userDataUrl)
else: else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误') raise ValueError(
return userInfo '[TETRIS STATS] TOSDataProcessing.getUserInfo: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool | dict[str, any]]: async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> tuple[bool, bool, dict[str, Any]]:
# 获取用户数据 # 获取用户数据
if userName is not None and teaID is None: if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}' userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
userData = await request(Url=userDataUrl) elif userName is None and teaID is not None:
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}' userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
userData = await request(Url=userDataUrl)
else: else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误') raise ValueError(
return userData '[TETRIS STATS] TOSDataProcessing.getUserData: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getRankStats(userInfo: dict) -> dict[str, bool | float]: async def getRankStats(userInfo: dict) -> dict[str, bool | float]:
# 获取Rank数据 # 获取Rank数据
rankStats = {} rankStats: dict[str, bool | float] = {}
if int(userInfo['Data']['data']['rankedGames']) == 0: if int(userInfo['data']['rankedGames']) == 0:
rankStats['Played'] = False rankStats['Played'] = False
else: else:
rankStats['Played'] = True rankStats['Played'] = True
rankStats['Rating'] = round( rankStats['Rating'] = round(
float(userInfo['Data']['data']['ratingNow']), 2) float(userInfo['data']['ratingNow']), 2)
rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2) rankStats['RD'] = round(float(userInfo['data']['rdNow']), 2)
rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3) rankStats['Vol'] = round(float(userInfo['data']['volNow']), 3)
return rankStats return rankStats
async def getGameData(userData: dict) -> dict[str, bool | int | float]: async def getGameData(userData: dict) -> dict[str, bool | int | float]:
# 获取游戏数据 # 获取游戏数据
gameData = {} gameData: dict[str, bool | int | float] = {}
if userData['Data']['data'] == []: if userData['data'] == []:
gameData['Played'] = False gameData['Played'] = False
else: else:
gameData['Played'] = True gameData['Played'] = True
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0 weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
for i in userData['Data']['data']: for i in userData['data']:
# 排除单人局和时间为0的游戏 # 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0: if i['num_players'] == 1 or i['time'] == 0:
continue continue
@@ -95,47 +104,48 @@ async def getGameData(userData: dict) -> dict[str, bool | int | float]:
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2) gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2) gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2) gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息 # TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息
return gameData return gameData
async def getPBData(userInfo: dict) -> dict[str, bool | float | str]: async def getPBData(userInfo: dict) -> dict[str, bool | float | str]:
# 获取PB数据 # 获取PB数据
PBData = {} PBData: dict[str, bool | float | str] = {}
if int(userInfo['Data']['data']['PBSprint']) == 2147483647: if int(userInfo['data']['PBSprint']) == 2147483647:
PBData['Sprint'] = False PBData['Sprint'] = False
else: else:
PBData['Sprint'] = round( PBData['Sprint'] = round(
float(userInfo['Data']['data']['PBSprint']) / 1000, 2) float(userInfo['data']['PBSprint']) / 1000, 2)
if int(userInfo['Data']['data']['PBMarathon']) == 0: if int(userInfo['data']['PBMarathon']) == 0:
PBData['Marathon'] = False PBData['Marathon'] = False
else: else:
PBData['Marathon'] = userInfo['Data']['data']['PBMarathon'] PBData['Marathon'] = userInfo['data']['PBMarathon']
if int(userInfo['Data']['data']['PBChallenge']) == 0: if int(userInfo['data']['PBChallenge']) == 0:
PBData['Challenge'] = False PBData['Challenge'] = False
else: else:
PBData['Challenge'] = userInfo['Data']['data']['PBChallenge'] PBData['Challenge'] = userInfo['data']['PBChallenge']
return PBData return PBData
async def generateMessage(userName: str = None, teaID: int = None) -> str: async def generateMessage(userName: str = None, teaID: int = None) -> str:
# 生成消息 # 生成消息
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID)) userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
if userInfo['Status'] is False: if userInfo[0] is False:
return f'用户信息请求失败' return f'用户信息请求失败'
if userInfo['Success'] is False: elif userInfo[1] is False:
return f'用户信息请求错误:\n{userInfo["Data"]["error"]}' return f'用户信息请求错误:\n{userInfo[2]["error"]}'
rankStats = await getRankStats(userInfo) rankStats, PBData = await gather(getRankStats(userInfo[2]), getPBData(userInfo[2]))
PBData = await getPBData(userInfo)
message = '' message = ''
if rankStats['Played'] is False: if rankStats['Played'] is False:
message += f'用户 {userInfo["Data"]["data"]["name"]}{userInfo["Data"]["data"]["teaId"]})暂无段位统计数据' message += f'用户 {userInfo[2]["data"]["name"]}{userInfo[2]["data"]["teaId"]})暂无段位统计数据'
elif rankStats['Played'] is True: elif rankStats['Played'] is True:
message += f'用户 {userInfo["Data"]["data"]["name"]} ({userInfo["Data"]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) ' message += f'用户 {userInfo[2]["data"]["name"]} ({userInfo[2]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
if userData['Status'] is False: if userData[0] is False:
message = f'{message.rstrip()}\n游戏数据请求失败' message = f'{message.rstrip()}\n游戏数据请求失败'
elif userData['Status'] is True: elif userData[1] is False:
gameData = await getGameData(userData) message = f'{message.rstrip()}\n游戏数据请求错误:\n{userData[2]["error"]}'
else:
gameData = await getGameData(userData[2])
if gameData['Played'] is False: if gameData['Played'] is False:
message += ', 暂无游戏数据' message += ', 暂无游戏数据'
elif gameData['Played'] is True: elif gameData['Played'] is True:

View File

@@ -0,0 +1 @@
from . import IODataProcessor, TOSDataProcessor

View File

@@ -1,168 +0,0 @@
from nonebot.log import logger
from asyncio import gather
import aiohttp
async def request(Url: str) -> dict[str, bool | dict[str, any]]:
# 封装请求函数
data = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data['Status'] = True
data['Data'] = await resp.json()
data['Success'] = data['Data']['success']
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] IODataProcessing.request: 请求错误\n{e}')
data['Status'] = False
finally:
return data
async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
# 获取用户数据
if userName is not None and userID is None:
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
userData = await request(Url=userDataUrl)
elif userName is None and userID is not None:
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
userData = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误')
return userData
async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
# 获取Solo数据
if userName is not None and userID is None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
soloData = await request(Url=userSoloUrl)
elif userName is None and userID is not None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
soloData = await request(Url=userSoloUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误')
return soloData
async def getUserIDInfo(userData: dict = None, userName: str = None, userID: str = None) -> dict[str, bool | str]:
# 获取用户ID
if userData is not None and userName is None and userID is None:
userID = userData['Data']['data']['user']['id']
elif userData is None and userName is not None and userID is None:
userData = await getUserData(userName=userName)
if userData['Status'] is False:
return {'Success': False, 'Message': '用户信息请求失败'}
if userData['Success'] is False:
return {'Success': False, 'Message': f'用户信息请求错误:\n{userData["Data"]["error"]}'}
userID = userData['Data']['data']['user']['id']
elif userData is None and userName is None and userID is not None:
userData = await getUserData(userID=userID)
if userData['Status'] is False:
return {'Success': False, 'Message': '用户信息请求失败'}
if userData['Success'] is False:
return {'Success': False, 'Message': f'用户信息请求错误:\n{userData["Data"]["error"]}'}
userID = userData['Data']['data']['user']['id']
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserIDInfo: 参数错误')
return {'Success': True, 'userID': userID}
async def getLeagueStats(userData: dict) -> dict[str, bool | int | str | float]:
# 获取排位统计数据
league = userData['Data']['data']['user']['league']
leagueStats = {}
if league['gamesplayed'] == 0:
leagueStats['Played'] = False
else:
leagueStats['Played'] = True
leagueStats['PPS'] = league['pps']
leagueStats['APM'] = league['apm']
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] != -1:
leagueStats['Ranked'] = True
leagueStats['Rating'] = round(league['rating'], 2)
leagueStats['Glicko'] = round(league['glicko'], 2)
leagueStats['RD'] = round(league['rd'], 2)
else:
leagueStats['Ranked'] = False
leagueStats['Standing'] = league['standing']
leagueStats['LPM'] = round((league['pps'] * 24), 2)
leagueStats['APL'] = round(
(leagueStats['APM'] / leagueStats['LPM']), 2)
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
leagueStats['ADPL'] = round(
(leagueStats['ADPM'] / leagueStats['LPM']), 2)
return leagueStats
async def getSprintStats(soloData: dict) -> dict[str, bool | int | float]:
# 获取40L统计数据
sprintStats = {}
if soloData['Data']['data']['records']['40l']['record'] is None:
sprintStats['Played'] = False
else:
sprintStats['Played'] = True
if soloData['Data']['data']['records']['40l']['rank'] is None:
sprintStats['Rank'] = False
else:
sprintStats['Rank'] = soloData['Data']['data']['records']['40l']['rank']
sprintStats['Time'] = round(
soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
return sprintStats
async def getBlitzStats(soloData: dict) -> dict[str, bool | int]:
# 获取Blitz统计数据
blitzStats = {}
if soloData['Data']['data']['records']['blitz']['record'] is None:
blitzStats['Played'] = False
else:
blitzStats['Played'] = True
if soloData['Data']['data']['records']['blitz']['rank'] is None:
blitzStats['Rank'] = False
else:
blitzStats['Rank'] = soloData['Data']['data']['records']['blitz']['rank']
blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score']
return blitzStats
async def generateMessage(userName: str = None, userID: str = None) -> str:
# 生成消息
userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID))
if userData['Status'] is False:
return '用户信息请求失败'
if userData['Success'] is False:
return f'用户信息请求错误:\n{userData["Data"]["error"]}'
userName = userData['Data']['data']['user']['username'].upper()
leagueStats = await getLeagueStats(userData)
message = ''
if leagueStats['Played'] is False:
message += f'用户 {userName} 没有排位统计数据'
else:
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
message += f'用户 {userName} 暂未完成定级赛'
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
else:
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
if leagueStats["VS"] != 0:
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
if soloData['Status'] is False:
return f'{message}\nSolo统计数据请求失败'
sprintStats = await getSprintStats(soloData)
blitzStats = await getBlitzStats(soloData)
if sprintStats['Played'] is True:
message += f'\n40L: {sprintStats["Time"]}s'
if sprintStats['Rank'] is not False:
message += f' ( #{sprintStats["Rank"]} )'
if blitzStats['Played'] is True:
message += f'\nBlitz: {blitzStats["Score"]}'
if blitzStats['Rank'] is not False:
message += f' ( #{blitzStats["Rank"]} )'
return message

View File

@@ -1,70 +0,0 @@
from re import match
async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
return await checkName(message, gameType)
async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_ME = ['', '自己', '', '', 'me']
message = (message.strip()).lower()
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
if message.startswith('[cq:at,qq='):
try:
QQNumber = int((str(message)).split(
'[cq:at,qq=')[1].split(']')[0])
except ValueError:
return {'Success': False, 'Type': None, 'Message': 'QQ号码不合法'}
else:
return {'Success': True, 'Type': 'AT', 'Message': None, 'QQNumber': QQNumber}
elif message in _ME:
return {'Success': True, 'Type': 'ME', 'Message': None}
else:
return await checkName(message, gameType)
async def checkName(name: str, gameType: str) -> dict[str, bool | str]:
if gameType == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return {'Success': True, 'Type': 'ID', 'Message': None, 'User': name}
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
elif name.isdigit() is True:
return {'Success': True, 'Type': 'QQ', 'Message': None, 'QQNumber': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}

View File

@@ -0,0 +1,79 @@
from re import match, sub
async def handleBindMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
else:
raise ValueError(
'[TETRIS STATS] MessageAnalyzer.handleBindMessage: 预期外行为请上报GitHub')
if message == '' or message.isspace():
return (None, ('用户名为空', None))
else:
return await checkName(message, gameType)
async def handleStatsQueryMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_ME = ['', '自己', '我等', '卑人', '', '老身', '', '老娘', '本姑娘', '本大爷',
'鄙人', '寡人', '小生', '贫僧', '本人', '', '', '', '', '', 'me']
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
if message == '' or message.isspace():
return (None, ('用户名为空', None))
else:
if message.startswith('[cq:at,qq='):
try:
user = int(str(message).split('[cq:at,qq=')[1].split(']')[0])
except ValueError:
return (None, ('QQ号码不合法', None))
else:
return ('AT', (None, user))
elif message in _ME:
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
return ('ME', (None, None))
else:
return await checkName(message, gameType)
async def checkName(name: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
if gameType == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return ('ID', (None, name))
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return ('Name', (None, name))
else:
return (None, ('用户名不合法', None))
elif gameType == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return ('Name', (None, name))
else:
return (None, ('用户名不合法', None))
elif gameType == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return ('Name', (None, name))
elif name.isdigit() is True:
return ('QQ', (None, name))
else:
return (None, ('用户名不合法', None))
else:
return (None, ('游戏类型错误', None))

View File

@@ -0,0 +1,17 @@
from nonebot.log import logger
from typing import Any
import aiohttp
async def request(Url: str) -> tuple[bool, bool, dict[str, Any]]:
# 封装请求函数
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data = await resp.json()
return (True, data['success'], data)
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] request.request: 请求错误\n{e}')
return (False, False, {})

View File

@@ -3,13 +3,17 @@ from nonebot.log import logger
import sqlite3 import sqlite3
import os import os
_DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db' _DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db'
async def initDB(): async def initDB():
# 初始化数据库 # 初始化数据库
if not os.path.exists(os.path.dirname(_DB_FILE)): if not os.path.exists(os.path.dirname(_DB_FILE)):
os.makedirs(os.path.dirname(_DB_FILE)) if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径
os.rename('data/nonebot-plugin-tetris-stats',
os.path.dirname(_DB_FILE))
else:
os.makedirs(os.path.dirname(_DB_FILE))
db = sqlite3.connect(_DB_FILE) db = sqlite3.connect(_DB_FILE)
cursor = db.cursor() cursor = db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
@@ -23,7 +27,7 @@ async def initDB():
logger.info('数据库初始化完成') logger.info('数据库初始化完成')
async def queryBindInfo(QQNumber: int, gameType: str) -> dict: async def queryBindInfo(QQNumber: str | int, gameType: str) -> str | None:
# 查询绑定信息 # 查询绑定信息
db = sqlite3.connect(_DB_FILE) db = sqlite3.connect(_DB_FILE)
cursor = db.cursor() cursor = db.cursor()
@@ -32,21 +36,21 @@ async def queryBindInfo(QQNumber: int, gameType: str) -> dict:
db.commit() db.commit()
db.close() db.close()
if user is None: if user is None:
return {'Hit': False, 'User': None} return None
else: else:
return {'Hit': True, 'User': user[0]} return user[0]
async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str: async def writeBindInfo(QQNumber: str | int, user: str, gameType: str) -> str:
# 写入绑定信息 # 写入绑定信息
info = await queryBindInfo(QQNumber, gameType) bindInfo = await queryBindInfo(QQNumber, gameType)
db = sqlite3.connect(_DB_FILE) db = sqlite3.connect(_DB_FILE)
cursor = db.cursor() cursor = db.cursor()
if info['Hit'] is True: if bindInfo is not None:
cursor.execute( cursor.execute(
f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber)) f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber))
message = '更新成功' message = '更新成功'
elif info['Hit'] is False: elif bindInfo is None:
cursor.execute( cursor.execute(
f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user)) f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user))
message = '绑定成功' message = '绑定成功'

View File

@@ -0,0 +1 @@
from . import MessageAnalyzer, Request, SQL

View File

@@ -1,15 +1,9 @@
from re import I from nonebot import get_driver
from nonebot import get_driver, on_regex from .Utils.SQL import initDB
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from .MessageAnalyzer import * from . import GameDataProcessor
from .SQL import *
from .IODataProcessing import generateMessage as IOgenerateMessage
from .IODataProcessing import getUserIDInfo as IOgetUserIDInfo
from .TOSDataProcessing import generateMessage as TOSgenerateMessage
driver = get_driver() driver = get_driver()
@@ -17,84 +11,3 @@ driver = get_driver()
@driver.on_startup @driver.on_startup
async def startUP(): async def startUP():
await initDB() await initDB()
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
@ioBind.handle()
async def bindIOUser(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleBindMessage(message=event.raw_message, gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'ID':
userIDInfo = await IOgetUserIDInfo(userID=decodedMessage['User'])
if userIDInfo['Success'] is False:
await matcher.finish(message=userIDInfo['Message'])
elif decodedMessage['Type'] == 'Name':
userIDInfo = await IOgetUserIDInfo(userName=decodedMessage['User'])
if userIDInfo['Success'] is False:
await matcher.finish(message=userIDInfo['Message'])
message = await writeBindInfo(QQNumber=event.sender.user_id, user=userIDInfo['userID'], gameType='IO')
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@ioStats.handle()
async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT':
if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息')
bindInfo = await queryBindInfo(QQNumber=decodedMessage['QQNumber'], gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '未查询到绑定信息'
elif decodedMessage['Type'] == 'ME':
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '您还没有绑定账号'
elif decodedMessage['Type'] == 'ID':
message = await IOgenerateMessage(userID=decodedMessage['User'])
elif decodedMessage['Type'] == 'Name':
message = await IOgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@tosStats.handle()
async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOS')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT' or decodedMessage['Type'] == 'QQ':
if decodedMessage['QQNumber'] == event.self_id:
await matcher.finish(message='不能查询bot的信息')
message = await TOSgenerateMessage(teaID=decodedMessage['QQNumber'])
elif decodedMessage['Type'] == 'ME':
message = await TOSgenerateMessage(teaID=event.sender.user_id)
elif decodedMessage['Type'] == 'Name':
message = await TOSgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@topBind.handle()
async def bindTOPUser(event: MessageEvent, matcher: Matcher):
await matcher.finish(message='TODO')
@topStats.handle()
async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher):
await matcher.finish(message='TODO')

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "nonebot-plugin-tetris-stats" name = "nonebot-plugin-tetris-stats"
version = "0.1.1" version = "0.2.0"
description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件" description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件"
authors = ["scdhh <wallfjjd@gmail.com>"] authors = ["scdhh <wallfjjd@gmail.com>"]
readme = "README.md" readme = "README.md"