diff --git a/.gitignore b/.gitignore index 1521c8b..a4e4801 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ dist +test* +Untitled* +*copy* diff --git a/nonebot_plugin_tetris_stats/GameDataProcessor/IODataProcessor.py b/nonebot_plugin_tetris_stats/GameDataProcessor/IODataProcessor.py new file mode 100644 index 0000000..6d3a994 --- /dev/null +++ b/nonebot_plugin_tetris_stats/GameDataProcessor/IODataProcessor.py @@ -0,0 +1,211 @@ +from nonebot import on_regex +from nonebot.adapters.onebot.v11 import GROUP, MessageEvent +from nonebot.matcher import Matcher +from nonebot.log import logger + +from typing import Any, Mapping +from asyncio import gather +from re import I + +from ..Utils.Request import request + +from ..Utils.MessageAnalyzer import handleBindMessage, handleStatsQueryMessage +from ..Utils.SQL import queryBindInfo, writeBindInfo + +ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP) +ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP) + + +@ioBind.handle() +async def _(event: MessageEvent, matcher: Matcher): + decodedMessage = await handleBindMessage(message=event.raw_message, gameType='IO') + if decodedMessage[0] is None: + await matcher.finish(decodedMessage[1][0]) + if decodedMessage[0] == 'ID': + userIDStats = await checkUserID(userID=decodedMessage[1][1]) + if userIDStats[0] is False: + await matcher.finish(userIDStats[1]) + else: + userID = decodedMessage[1][1] + elif decodedMessage[0] == 'Name': + userData = await getUserData(userName=decodedMessage[1][1]) + if userData[0] is False: + await matcher.finish('用户信息请求失败') + elif userData[1] is False: + await matcher.finish(f'用户信息请求错误:\n{userData[2]["error"]}') + else: + userID = await getUserID(userData=userData[2]) + if event.sender.user_id is None: # 理论上是不会有None出现的,ide快乐行属于是( + logger.error('获取QQ号失败') + await matcher.finish('获取QQ号失败') + await matcher.finish(await writeBindInfo(QQNumber=event.sender.user_id, user=userID, gameType='IO')) + + +@ioStats.handle() +async def _(event: MessageEvent, matcher: Matcher): + decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='IO') + if decodedMessage[0] is None: + await matcher.finish(decodedMessage[1][0]) + elif decodedMessage[0] == 'AT': + if event.is_tome() is True: + await matcher.finish(message='不能查询bot的信息') + bindInfo = await queryBindInfo(QQNumber=decodedMessage[1][1], gameType='IO') + if bindInfo is None: + message = '未查询到绑定信息' + else: + message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}') + elif decodedMessage[0] == 'ME': + if event.sender.user_id is None: + logger.error('获取QQ号失败') + await matcher.finish('获取QQ号失败,请联系bot主人') + bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO') + if bindInfo is None: + message = '未查询到绑定信息' + else: + message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}') + elif decodedMessage[0] == 'ID': + message = await generateMessage(userID=decodedMessage[1][1]) + elif decodedMessage[0] == 'Name': + message = await generateMessage(userName=decodedMessage[1][1]) + await matcher.finish(message=message) + + +async def getUserData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]: + # 获取用户数据 + if userName is not None and userID is None: + userDataUrl = f'https://ch.tetr.io/api/users/{userName}' + elif userName is None and userID is not None: + userDataUrl = f'https://ch.tetr.io/api/users/{userID}' + else: + raise ValueError( + '[TETRIS STATS] IODataProcessing.getUserData: 预期外行为,请上报GitHub') + return await request(Url=userDataUrl) + + +async def getSoloData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]: + # 获取Solo数据 + if userName is not None and userID is None: + userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records' + elif userName is None and userID is not None: + userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records' + else: + raise ValueError( + '[TETRIS STATS] IODataProcessing.getSoloData: 预期外行为,请上报GitHub') + return await request(Url=userSoloUrl) + + +async def getUserID(userData: dict) -> str: + return userData['data']['user']['_id'] + + +async def checkUserID(userID: str) -> tuple[bool, str]: + userData = await getUserData(userID=userID) + if userData[0] is False: + return (False, '用户信息请求失败') + elif userData[1] is False: + return (False, f'用户信息请求错误:\n{userData[2]["error"]}') + elif userID == userData[2]['data']['user']['_id']: + return (True, '') + else: + raise ValueError( + '[TETRIS STATS] IODataProcessing.checkUserID: 服务器返回的userID和用户提供的不一致,这种情况理论上不应该发生,以防万一还是写一下(x') + + +async def getLeagueStats(userData: dict) -> dict[str, Any]: + # 获取排位统计数据 + league = userData['data']['user']['league'] + leagueStats: dict[str, Any] = {} + if league['gamesplayed'] == 0: + leagueStats['Played'] = False + else: + leagueStats['Played'] = True + leagueStats['PPS'] = league['pps'] + leagueStats['APM'] = league['apm'] + leagueStats['VS'] = 0 if league['vs'] is None else league['vs'] + leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper() + if league['rating'] != -1: + leagueStats['Ranked'] = True + leagueStats['Rating'] = round(league['rating'], 2) + leagueStats['Glicko'] = round(league['glicko'], 2) + leagueStats['RD'] = round(league['rd'], 2) + else: + leagueStats['Ranked'] = False + leagueStats['Standing'] = league['standing'] + leagueStats['LPM'] = round((league['pps'] * 24), 2) + leagueStats['APL'] = round( + (leagueStats['APM'] / leagueStats['LPM']), 2) + leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2) + leagueStats['ADPL'] = round( + (leagueStats['ADPM'] / leagueStats['LPM']), 2) + return leagueStats + + +async def getSprintStats(soloData: dict) -> Mapping[str, bool | int | float]: + # 获取40L统计数据 + sprintStats = {} + if soloData['data']['records']['40l']['record'] is None: + sprintStats['Played'] = False + else: + sprintStats['Played'] = True + if soloData['data']['records']['40l']['rank'] is None: + sprintStats['Rank'] = False + else: + sprintStats['Rank'] = soloData['data']['records']['40l']['rank'] + sprintStats['Time'] = round( + soloData['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2) + return sprintStats + + +async def getBlitzStats(soloData: dict) -> dict[str, Any]: + # 获取Blitz统计数据 + blitzStats = {} + if soloData['data']['records']['blitz']['record'] is None: + blitzStats['Played'] = False + else: + blitzStats['Played'] = True + if soloData['data']['records']['blitz']['rank'] is None: + blitzStats['Rank'] = False + else: + blitzStats['Rank'] = soloData['data']['records']['blitz']['rank'] + blitzStats['Score'] = soloData['data']['records']['blitz']['record']['endcontext']['score'] + return blitzStats + + +async def generateMessage(userName: str = None, userID: str = None) -> str: + # 生成消息 + userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID)) + if userData[0] is False: + return '用户信息请求失败' + elif userData[1] is False: + return f'用户信息请求错误:\n{userData[2]["error"]}' + userName = userData[2]['data']['user']['username'].upper() + leagueStats = await getLeagueStats(userData[2]) + message = '' + if leagueStats['Played'] is False: + message += f'用户 {userName} 没有排位统计数据' + else: + if leagueStats['Rank'] is False and leagueStats['Ranked'] is False: + message += f'用户 {userName} 暂未完成定级赛' + elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True: + message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR' + else: + message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})' + message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:' + message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )' + message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )' + if leagueStats["VS"] != 0: + message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )' + if soloData[0] is False: + return f'{message}\nSolo统计数据请求失败' + elif soloData[1] is False: + return f'{message}\nSolo统计数据请求错误:\n{soloData[2]["error"]}' + sprintStats, blitzStats = await gather(getSprintStats(soloData[2]), getBlitzStats(soloData[2])) + if sprintStats['Played'] is True: + message += f'\n40L: {sprintStats["Time"]}s' + if sprintStats['Rank'] is not False: + message += f' ( #{sprintStats["Rank"]} )' + if blitzStats['Played'] is True: + message += f'\nBlitz: {blitzStats["Score"]}' + if blitzStats['Rank'] is not False: + message += f' ( #{blitzStats["Rank"]} )' + return message diff --git a/nonebot_plugin_tetris_stats/TOSDataProcessing.py b/nonebot_plugin_tetris_stats/GameDataProcessor/TOSDataProcessor.py similarity index 54% rename from nonebot_plugin_tetris_stats/TOSDataProcessing.py rename to nonebot_plugin_tetris_stats/GameDataProcessor/TOSDataProcessor.py index 079d5b1..b537218 100644 --- a/nonebot_plugin_tetris_stats/TOSDataProcessing.py +++ b/nonebot_plugin_tetris_stats/GameDataProcessor/TOSDataProcessor.py @@ -1,74 +1,83 @@ -from nonebot.log import logger +from nonebot import on_regex +from nonebot.adapters.onebot.v11 import GROUP, MessageEvent +from nonebot.matcher import Matcher +from typing import Any from asyncio import gather -import aiohttp +from re import I + +from ..Utils.Request import request + +from ..Utils.MessageAnalyzer import * +from ..Utils.SQL import * + +tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats', + flags=I, permission=GROUP) -async def request(Url: str) -> dict[str, bool | dict[str, any]]: - # 封装请求函数 - data = {} - try: - async with aiohttp.ClientSession() as session: - async with session.get(Url) as resp: - data['Status'] = True - data['Data'] = await resp.json() - data['Success'] = data['Data']['success'] - except aiohttp.client_exceptions.ClientConnectorError as e: - logger.error(f'[TETRIS STATS] TOSDataProcessing.request: 请求错误\n{e}') - data['Status'] = False - finally: - return data +@tosStats.handle() +async def _(event: MessageEvent, matcher: Matcher): + decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOS') + if decodedMessage[0] is None: + await matcher.finish(decodedMessage[1][0]) + elif decodedMessage[0] == 'AT' or decodedMessage[0] == 'QQ': + if decodedMessage[1][1] == event.self_id: + await matcher.finish(message='不能查询bot的信息') + message = await generateMessage(teaID=decodedMessage[1][1]) + elif decodedMessage[0] == 'ME': + message = await generateMessage(teaID=event.sender.user_id) + elif decodedMessage[0] == 'Name': + message = await generateMessage(userName=decodedMessage[1][1]) + await matcher.finish(message=message) -async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool | dict[str, any]]: +async def getUserInfo(userName: str = None, teaID: int = None) -> tuple[bool, bool, dict[str, Any]]: # 获取用户信息 if userName is not None and teaID is None: userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}' - userInfo = await request(Url=userDataUrl) - elif teaID is not None and userName is None: + elif userName is None and teaID is not None: userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}' - userInfo = await request(Url=userDataUrl) else: - raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误') - return userInfo + raise ValueError( + '[TETRIS STATS] TOSDataProcessing.getUserInfo: 预期外行为,请上报GitHub') + return await request(Url=userDataUrl) -async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool | dict[str, any]]: +async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> tuple[bool, bool, dict[str, Any]]: # 获取用户数据 if userName is not None and teaID is None: userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}' - userData = await request(Url=userDataUrl) - elif teaID is not None and userName is None: + elif userName is None and teaID is not None: userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}' - userData = await request(Url=userDataUrl) else: - raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误') - return userData + raise ValueError( + '[TETRIS STATS] TOSDataProcessing.getUserData: 预期外行为,请上报GitHub') + return await request(Url=userDataUrl) async def getRankStats(userInfo: dict) -> dict[str, bool | float]: # 获取Rank数据 - rankStats = {} - if int(userInfo['Data']['data']['rankedGames']) == 0: + rankStats: dict[str, bool | float] = {} + if int(userInfo['data']['rankedGames']) == 0: rankStats['Played'] = False else: rankStats['Played'] = True rankStats['Rating'] = round( - float(userInfo['Data']['data']['ratingNow']), 2) - rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2) - rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3) + float(userInfo['data']['ratingNow']), 2) + rankStats['RD'] = round(float(userInfo['data']['rdNow']), 2) + rankStats['Vol'] = round(float(userInfo['data']['volNow']), 3) return rankStats async def getGameData(userData: dict) -> dict[str, bool | int | float]: # 获取游戏数据 - gameData = {} - if userData['Data']['data'] == []: + gameData: dict[str, bool | int | float] = {} + if userData['data'] == []: gameData['Played'] = False else: gameData['Played'] = True weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0 - for i in userData['Data']['data']: + for i in userData['data']: # 排除单人局和时间为0的游戏 if i['num_players'] == 1 or i['time'] == 0: continue @@ -95,47 +104,48 @@ async def getGameData(userData: dict) -> dict[str, bool | int | float]: gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2) gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2) gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2) - # TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息 + # TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息 return gameData async def getPBData(userInfo: dict) -> dict[str, bool | float | str]: # 获取PB数据 - PBData = {} - if int(userInfo['Data']['data']['PBSprint']) == 2147483647: + PBData: dict[str, bool | float | str] = {} + if int(userInfo['data']['PBSprint']) == 2147483647: PBData['Sprint'] = False else: PBData['Sprint'] = round( - float(userInfo['Data']['data']['PBSprint']) / 1000, 2) - if int(userInfo['Data']['data']['PBMarathon']) == 0: + float(userInfo['data']['PBSprint']) / 1000, 2) + if int(userInfo['data']['PBMarathon']) == 0: PBData['Marathon'] = False else: - PBData['Marathon'] = userInfo['Data']['data']['PBMarathon'] - if int(userInfo['Data']['data']['PBChallenge']) == 0: + PBData['Marathon'] = userInfo['data']['PBMarathon'] + if int(userInfo['data']['PBChallenge']) == 0: PBData['Challenge'] = False else: - PBData['Challenge'] = userInfo['Data']['data']['PBChallenge'] + PBData['Challenge'] = userInfo['data']['PBChallenge'] return PBData async def generateMessage(userName: str = None, teaID: int = None) -> str: # 生成消息 userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID)) - if userInfo['Status'] is False: + if userInfo[0] is False: return f'用户信息请求失败' - if userInfo['Success'] is False: - return f'用户信息请求错误:\n{userInfo["Data"]["error"]}' - rankStats = await getRankStats(userInfo) - PBData = await getPBData(userInfo) + elif userInfo[1] is False: + return f'用户信息请求错误:\n{userInfo[2]["error"]}' + rankStats, PBData = await gather(getRankStats(userInfo[2]), getPBData(userInfo[2])) message = '' if rankStats['Played'] is False: - message += f'用户 {userInfo["Data"]["data"]["name"]}({userInfo["Data"]["data"]["teaId"]})暂无段位统计数据' + message += f'用户 {userInfo[2]["data"]["name"]}({userInfo[2]["data"]["teaId"]})暂无段位统计数据' elif rankStats['Played'] is True: - message += f'用户 {userInfo["Data"]["data"]["name"]} ({userInfo["Data"]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) ' - if userData['Status'] is False: + message += f'用户 {userInfo[2]["data"]["name"]} ({userInfo[2]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) ' + if userData[0] is False: message = f'{message.rstrip()}\n游戏数据请求失败' - elif userData['Status'] is True: - gameData = await getGameData(userData) + elif userData[1] is False: + message = f'{message.rstrip()}\n游戏数据请求错误:\n{userData[2]["error"]}' + else: + gameData = await getGameData(userData[2]) if gameData['Played'] is False: message += ', 暂无游戏数据' elif gameData['Played'] is True: diff --git a/nonebot_plugin_tetris_stats/GameDataProcessor/__init__.py b/nonebot_plugin_tetris_stats/GameDataProcessor/__init__.py new file mode 100644 index 0000000..80027b4 --- /dev/null +++ b/nonebot_plugin_tetris_stats/GameDataProcessor/__init__.py @@ -0,0 +1 @@ +from . import IODataProcessor, TOSDataProcessor diff --git a/nonebot_plugin_tetris_stats/IODataProcessing.py b/nonebot_plugin_tetris_stats/IODataProcessing.py deleted file mode 100644 index 38ec4a7..0000000 --- a/nonebot_plugin_tetris_stats/IODataProcessing.py +++ /dev/null @@ -1,168 +0,0 @@ -from nonebot.log import logger - -from asyncio import gather -import aiohttp - - -async def request(Url: str) -> dict[str, bool | dict[str, any]]: - # 封装请求函数 - data = {} - try: - async with aiohttp.ClientSession() as session: - async with session.get(Url) as resp: - data['Status'] = True - data['Data'] = await resp.json() - data['Success'] = data['Data']['success'] - except aiohttp.client_exceptions.ClientConnectorError as e: - logger.error(f'[TETRIS STATS] IODataProcessing.request: 请求错误\n{e}') - data['Status'] = False - finally: - return data - - -async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]: - # 获取用户数据 - if userName is not None and userID is None: - userDataUrl = f'https://ch.tetr.io/api/users/{userName}' - userData = await request(Url=userDataUrl) - elif userName is None and userID is not None: - userDataUrl = f'https://ch.tetr.io/api/users/{userID}' - userData = await request(Url=userDataUrl) - else: - raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误') - return userData - - -async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]: - # 获取Solo数据 - if userName is not None and userID is None: - userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records' - soloData = await request(Url=userSoloUrl) - elif userName is None and userID is not None: - userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records' - soloData = await request(Url=userSoloUrl) - else: - raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误') - return soloData - - -async def getUserIDInfo(userData: dict = None, userName: str = None, userID: str = None) -> dict[str, bool | str]: - # 获取用户ID - if userData is not None and userName is None and userID is None: - userID = userData['Data']['data']['user']['id'] - elif userData is None and userName is not None and userID is None: - userData = await getUserData(userName=userName) - if userData['Status'] is False: - return {'Success': False, 'Message': '用户信息请求失败'} - if userData['Success'] is False: - return {'Success': False, 'Message': f'用户信息请求错误:\n{userData["Data"]["error"]}'} - userID = userData['Data']['data']['user']['id'] - elif userData is None and userName is None and userID is not None: - userData = await getUserData(userID=userID) - if userData['Status'] is False: - return {'Success': False, 'Message': '用户信息请求失败'} - if userData['Success'] is False: - return {'Success': False, 'Message': f'用户信息请求错误:\n{userData["Data"]["error"]}'} - userID = userData['Data']['data']['user']['id'] - else: - raise ValueError('[TETRIS STATS] IODataProcessing.getUserIDInfo: 参数错误') - return {'Success': True, 'userID': userID} - - -async def getLeagueStats(userData: dict) -> dict[str, bool | int | str | float]: - # 获取排位统计数据 - league = userData['Data']['data']['user']['league'] - leagueStats = {} - if league['gamesplayed'] == 0: - leagueStats['Played'] = False - else: - leagueStats['Played'] = True - leagueStats['PPS'] = league['pps'] - leagueStats['APM'] = league['apm'] - leagueStats['VS'] = 0 if league['vs'] is None else league['vs'] - leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper() - if league['rating'] != -1: - leagueStats['Ranked'] = True - leagueStats['Rating'] = round(league['rating'], 2) - leagueStats['Glicko'] = round(league['glicko'], 2) - leagueStats['RD'] = round(league['rd'], 2) - else: - leagueStats['Ranked'] = False - leagueStats['Standing'] = league['standing'] - leagueStats['LPM'] = round((league['pps'] * 24), 2) - leagueStats['APL'] = round( - (leagueStats['APM'] / leagueStats['LPM']), 2) - leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2) - leagueStats['ADPL'] = round( - (leagueStats['ADPM'] / leagueStats['LPM']), 2) - return leagueStats - - -async def getSprintStats(soloData: dict) -> dict[str, bool | int | float]: - # 获取40L统计数据 - sprintStats = {} - if soloData['Data']['data']['records']['40l']['record'] is None: - sprintStats['Played'] = False - else: - sprintStats['Played'] = True - if soloData['Data']['data']['records']['40l']['rank'] is None: - sprintStats['Rank'] = False - else: - sprintStats['Rank'] = soloData['Data']['data']['records']['40l']['rank'] - sprintStats['Time'] = round( - soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2) - return sprintStats - - -async def getBlitzStats(soloData: dict) -> dict[str, bool | int]: - # 获取Blitz统计数据 - blitzStats = {} - if soloData['Data']['data']['records']['blitz']['record'] is None: - blitzStats['Played'] = False - else: - blitzStats['Played'] = True - if soloData['Data']['data']['records']['blitz']['rank'] is None: - blitzStats['Rank'] = False - else: - blitzStats['Rank'] = soloData['Data']['data']['records']['blitz']['rank'] - blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score'] - return blitzStats - - -async def generateMessage(userName: str = None, userID: str = None) -> str: - # 生成消息 - userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID)) - if userData['Status'] is False: - return '用户信息请求失败' - if userData['Success'] is False: - return f'用户信息请求错误:\n{userData["Data"]["error"]}' - userName = userData['Data']['data']['user']['username'].upper() - leagueStats = await getLeagueStats(userData) - message = '' - if leagueStats['Played'] is False: - message += f'用户 {userName} 没有排位统计数据' - else: - if leagueStats['Rank'] is False and leagueStats['Ranked'] is False: - message += f'用户 {userName} 暂未完成定级赛' - elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True: - message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR' - else: - message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})' - message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:' - message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )' - message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )' - if leagueStats["VS"] != 0: - message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )' - if soloData['Status'] is False: - return f'{message}\nSolo统计数据请求失败' - sprintStats = await getSprintStats(soloData) - blitzStats = await getBlitzStats(soloData) - if sprintStats['Played'] is True: - message += f'\n40L: {sprintStats["Time"]}s' - if sprintStats['Rank'] is not False: - message += f' ( #{sprintStats["Rank"]} )' - if blitzStats['Played'] is True: - message += f'\nBlitz: {blitzStats["Score"]}' - if blitzStats['Rank'] is not False: - message += f' ( #{blitzStats["Rank"]} )' - return message diff --git a/nonebot_plugin_tetris_stats/MessageAnalyzer.py b/nonebot_plugin_tetris_stats/MessageAnalyzer.py deleted file mode 100644 index 37fe87f..0000000 --- a/nonebot_plugin_tetris_stats/MessageAnalyzer.py +++ /dev/null @@ -1,70 +0,0 @@ -from re import match - - -async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]: - _CMD_ALIASES = {'IO': ['io绑定', 'iobind'], - 'TOP': ['top绑定', 'topbind']} - # 剔除命令前缀 - for i in _CMD_ALIASES[gameType]: - if message.startswith(i): - message = message.replace(i, '') - message = message.strip() - break - if message == '' or message.isspace(): - return {'Success': False, 'Type': None, 'Message': '用户名为空'} - else: - return await checkName(message, gameType) - - -async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]: - _CMD_ALIASES = {'IO': ['io查', 'iostats'], - 'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'], - 'TOP': ['top查', 'topstats']} - _ME = ['我', '自己', '私', '俺', 'me'] - message = (message.strip()).lower() - # 剔除命令前缀 - for i in _CMD_ALIASES[gameType]: - if message.startswith(i): - message = message.replace(i, '') - message = message.strip() - break - if message == '' or message.isspace(): - return {'Success': False, 'Type': None, 'Message': '用户名为空'} - else: - if message.startswith('[cq:at,qq='): - try: - QQNumber = int((str(message)).split( - '[cq:at,qq=')[1].split(']')[0]) - except ValueError: - return {'Success': False, 'Type': None, 'Message': 'QQ号码不合法'} - else: - return {'Success': True, 'Type': 'AT', 'Message': None, 'QQNumber': QQNumber} - elif message in _ME: - return {'Success': True, 'Type': 'ME', 'Message': None} - else: - return await checkName(message, gameType) - - -async def checkName(name: str, gameType: str) -> dict[str, bool | str]: - if gameType == 'IO': - if match(r'^[a-f0-9]{24}$', name): - return {'Success': True, 'Type': 'ID', 'Message': None, 'User': name} - elif match(r'^[a-zA-Z0-9_-]{3,16}$', name): - return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name} - else: - return {'Success': False, 'Type': None, 'Message': '用户名不合法'} - elif gameType == 'TOP': - if match(r'^[a-zA-Z0-9_]{1,16}$', name): - return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name} - else: - return {'Success': False, 'Type': None, 'Message': '用户名不合法'} - elif gameType == 'TOS': - if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name) - and name.isdigit() is False - and 2 <= len(name) <= 18): - # 虽然我也不想这么长 但是似乎确实得这么长 - return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name} - elif name.isdigit() is True: - return {'Success': True, 'Type': 'QQ', 'Message': None, 'QQNumber': name} - else: - return {'Success': False, 'Type': None, 'Message': '用户名不合法'} diff --git a/nonebot_plugin_tetris_stats/Utils/MessageAnalyzer.py b/nonebot_plugin_tetris_stats/Utils/MessageAnalyzer.py new file mode 100644 index 0000000..7a22d65 --- /dev/null +++ b/nonebot_plugin_tetris_stats/Utils/MessageAnalyzer.py @@ -0,0 +1,79 @@ +from re import match, sub + + +async def handleBindMessage(message: str, gameType: str) -> tuple[str | None, tuple]: + '''返回值为tuple[gameType, tuple[message, user]]''' + _CMD_ALIASES = {'IO': ['io绑定', 'iobind'], + 'TOP': ['top绑定', 'topbind']} + # 剔除命令前缀 + for i in _CMD_ALIASES[gameType]: + if message.startswith(i): + message = sub(rf'(?i){i}', '', message) + message = message.strip() + break + else: + raise ValueError( + '[TETRIS STATS] MessageAnalyzer.handleBindMessage: 预期外行为,请上报GitHub') + if message == '' or message.isspace(): + return (None, ('用户名为空', None)) + else: + return await checkName(message, gameType) + + +async def handleStatsQueryMessage(message: str, gameType: str) -> tuple[str | None, tuple]: + '''返回值为tuple[gameType, tuple[message, user]]''' + _CMD_ALIASES = {'IO': ['io查', 'iostats'], + 'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'], + 'TOP': ['top查', 'topstats']} + _ME = ['我', '自己', '我等', '卑人', '愚', '老身', '爷', '老娘', '本姑娘', '本大爷', + '鄙人', '寡人', '小生', '贫僧', '本人', '孤', '吾', '俺', '咱', '私', 'me'] + # 剔除命令前缀 + for i in _CMD_ALIASES[gameType]: + if message.startswith(i): + message = sub(rf'(?i){i}', '', message) + message = message.strip() + break + if message == '' or message.isspace(): + return (None, ('用户名为空', None)) + else: + if message.startswith('[cq:at,qq='): + try: + user = int(str(message).split('[cq:at,qq=')[1].split(']')[0]) + except ValueError: + return (None, ('QQ号码不合法', None)) + else: + return ('AT', (None, user)) + elif message in _ME: + # 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢 + # TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName + return ('ME', (None, None)) + else: + return await checkName(message, gameType) + + +async def checkName(name: str, gameType: str) -> tuple[str | None, tuple]: + '''返回值为tuple[gameType, tuple[message, user]]''' + if gameType == 'IO': + if match(r'^[a-f0-9]{24}$', name): + return ('ID', (None, name)) + elif match(r'^[a-zA-Z0-9_-]{3,16}$', name): + return ('Name', (None, name)) + else: + return (None, ('用户名不合法', None)) + elif gameType == 'TOP': + if match(r'^[a-zA-Z0-9_]{1,16}$', name): + return ('Name', (None, name)) + else: + return (None, ('用户名不合法', None)) + elif gameType == 'TOS': + if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name) + and name.isdigit() is False + and 2 <= len(name) <= 18): + # 虽然我也不想这么长 但是似乎确实得这么长 + return ('Name', (None, name)) + elif name.isdigit() is True: + return ('QQ', (None, name)) + else: + return (None, ('用户名不合法', None)) + else: + return (None, ('游戏类型错误', None)) diff --git a/nonebot_plugin_tetris_stats/Utils/Request.py b/nonebot_plugin_tetris_stats/Utils/Request.py new file mode 100644 index 0000000..87d0729 --- /dev/null +++ b/nonebot_plugin_tetris_stats/Utils/Request.py @@ -0,0 +1,17 @@ +from nonebot.log import logger + +from typing import Any + +import aiohttp + + +async def request(Url: str) -> tuple[bool, bool, dict[str, Any]]: + # 封装请求函数 + try: + async with aiohttp.ClientSession() as session: + async with session.get(Url) as resp: + data = await resp.json() + return (True, data['success'], data) + except aiohttp.client_exceptions.ClientConnectorError as e: + logger.error(f'[TETRIS STATS] request.request: 请求错误\n{e}') + return (False, False, {}) diff --git a/nonebot_plugin_tetris_stats/SQL.py b/nonebot_plugin_tetris_stats/Utils/SQL.py similarity index 66% rename from nonebot_plugin_tetris_stats/SQL.py rename to nonebot_plugin_tetris_stats/Utils/SQL.py index 34b172f..7f70d16 100644 --- a/nonebot_plugin_tetris_stats/SQL.py +++ b/nonebot_plugin_tetris_stats/Utils/SQL.py @@ -3,13 +3,17 @@ from nonebot.log import logger import sqlite3 import os -_DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db' +_DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db' async def initDB(): # 初始化数据库 if not os.path.exists(os.path.dirname(_DB_FILE)): - os.makedirs(os.path.dirname(_DB_FILE)) + if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径 + os.rename('data/nonebot-plugin-tetris-stats', + os.path.dirname(_DB_FILE)) + else: + os.makedirs(os.path.dirname(_DB_FILE)) db = sqlite3.connect(_DB_FILE) cursor = db.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND @@ -23,7 +27,7 @@ async def initDB(): logger.info('数据库初始化完成') -async def queryBindInfo(QQNumber: int, gameType: str) -> dict: +async def queryBindInfo(QQNumber: str | int, gameType: str) -> str | None: # 查询绑定信息 db = sqlite3.connect(_DB_FILE) cursor = db.cursor() @@ -32,21 +36,21 @@ async def queryBindInfo(QQNumber: int, gameType: str) -> dict: db.commit() db.close() if user is None: - return {'Hit': False, 'User': None} + return None else: - return {'Hit': True, 'User': user[0]} + return user[0] -async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str: +async def writeBindInfo(QQNumber: str | int, user: str, gameType: str) -> str: # 写入绑定信息 - info = await queryBindInfo(QQNumber, gameType) + bindInfo = await queryBindInfo(QQNumber, gameType) db = sqlite3.connect(_DB_FILE) cursor = db.cursor() - if info['Hit'] is True: + if bindInfo is not None: cursor.execute( f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber)) message = '更新成功' - elif info['Hit'] is False: + elif bindInfo is None: cursor.execute( f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user)) message = '绑定成功' diff --git a/nonebot_plugin_tetris_stats/Utils/__init__.py b/nonebot_plugin_tetris_stats/Utils/__init__.py new file mode 100644 index 0000000..60e38d8 --- /dev/null +++ b/nonebot_plugin_tetris_stats/Utils/__init__.py @@ -0,0 +1 @@ +from . import MessageAnalyzer, Request, SQL \ No newline at end of file diff --git a/nonebot_plugin_tetris_stats/__init__.py b/nonebot_plugin_tetris_stats/__init__.py index 3d46531..4281b97 100644 --- a/nonebot_plugin_tetris_stats/__init__.py +++ b/nonebot_plugin_tetris_stats/__init__.py @@ -1,15 +1,9 @@ -from re import I +from nonebot import get_driver -from nonebot import get_driver, on_regex -from nonebot.adapters.onebot.v11 import GROUP, MessageEvent -from nonebot.matcher import Matcher +from .Utils.SQL import initDB -from .MessageAnalyzer import * -from .SQL import * +from . import GameDataProcessor -from .IODataProcessing import generateMessage as IOgenerateMessage -from .IODataProcessing import getUserIDInfo as IOgetUserIDInfo -from .TOSDataProcessing import generateMessage as TOSgenerateMessage driver = get_driver() @@ -17,84 +11,3 @@ driver = get_driver() @driver.on_startup async def startUP(): await initDB() - -ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP) -ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP) - -tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats', - flags=I, permission=GROUP) - -topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP) -topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP) - - -@ioBind.handle() -async def bindIOUser(event: MessageEvent, matcher: Matcher): - decodedMessage = await handleBindMessage(message=event.raw_message, gameType='IO') - if decodedMessage['Success'] is True: - if decodedMessage['Type'] == 'ID': - userIDInfo = await IOgetUserIDInfo(userID=decodedMessage['User']) - if userIDInfo['Success'] is False: - await matcher.finish(message=userIDInfo['Message']) - elif decodedMessage['Type'] == 'Name': - userIDInfo = await IOgetUserIDInfo(userName=decodedMessage['User']) - if userIDInfo['Success'] is False: - await matcher.finish(message=userIDInfo['Message']) - message = await writeBindInfo(QQNumber=event.sender.user_id, user=userIDInfo['userID'], gameType='IO') - elif decodedMessage['Success'] is False: - message = decodedMessage['Message'] - await matcher.finish(message=message) - - -@ioStats.handle() -async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher): - decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='IO') - if decodedMessage['Success'] is True: - if decodedMessage['Type'] == 'AT': - if event.is_tome() is True: - await matcher.finish(message='不能查询bot的信息') - bindInfo = await queryBindInfo(QQNumber=decodedMessage['QQNumber'], gameType='IO') - if bindInfo['Hit'] is True: - message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}') - elif bindInfo['Hit'] is False: - message = '未查询到绑定信息' - elif decodedMessage['Type'] == 'ME': - bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO') - if bindInfo['Hit'] is True: - message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}') - elif bindInfo['Hit'] is False: - message = '您还没有绑定账号' - elif decodedMessage['Type'] == 'ID': - message = await IOgenerateMessage(userID=decodedMessage['User']) - elif decodedMessage['Type'] == 'Name': - message = await IOgenerateMessage(userName=decodedMessage['User']) - elif decodedMessage['Success'] is False: - message = decodedMessage['Message'] - await matcher.finish(message=message) - - -@tosStats.handle() -async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher): - decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOS') - if decodedMessage['Success'] is True: - if decodedMessage['Type'] == 'AT' or decodedMessage['Type'] == 'QQ': - if decodedMessage['QQNumber'] == event.self_id: - await matcher.finish(message='不能查询bot的信息') - message = await TOSgenerateMessage(teaID=decodedMessage['QQNumber']) - elif decodedMessage['Type'] == 'ME': - message = await TOSgenerateMessage(teaID=event.sender.user_id) - elif decodedMessage['Type'] == 'Name': - message = await TOSgenerateMessage(userName=decodedMessage['User']) - elif decodedMessage['Success'] is False: - message = decodedMessage['Message'] - await matcher.finish(message=message) - - -@topBind.handle() -async def bindTOPUser(event: MessageEvent, matcher: Matcher): - await matcher.finish(message='TODO') - - -@topStats.handle() -async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher): - await matcher.finish(message='TODO') diff --git a/pyproject.toml b/pyproject.toml index e489279..8bf50c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nonebot-plugin-tetris-stats" -version = "0.1.1" +version = "0.2.0" description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件" authors = ["scdhh "] readme = "README.md"