From c9e79232433b142b071def92a52bab4e7cdd11c7 Mon Sep 17 00:00:00 2001 From: scdhh Date: Tue, 24 May 2022 18:58:19 +0800 Subject: [PATCH] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IODataProcessing.py | 57 ++++++++++++------- .../MessageAnalyzer.py | 7 ++- nonebot_plugin_tetris_stats/SQL.py | 9 ++- .../TOSDataProcessing.py | 41 +++++++------ nonebot_plugin_tetris_stats/__init__.py | 6 ++ 5 files changed, 78 insertions(+), 42 deletions(-) diff --git a/nonebot_plugin_tetris_stats/IODataProcessing.py b/nonebot_plugin_tetris_stats/IODataProcessing.py index 8abf102..c7ef97d 100644 --- a/nonebot_plugin_tetris_stats/IODataProcessing.py +++ b/nonebot_plugin_tetris_stats/IODataProcessing.py @@ -3,8 +3,9 @@ from nonebot.log import logger from asyncio import gather import aiohttp -# 封装请求函数 -async def request(Url: str) -> dict[str, bool|dict[str, any]]: + +async def request(Url: str) -> dict[str, bool | dict[str, any]]: + # 封装请求函数 data = {} try: async with aiohttp.ClientSession() as session: @@ -18,8 +19,9 @@ async def request(Url: str) -> dict[str, bool|dict[str, any]]: finally: return data -# 获取用户数据 + async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]: + # 获取用户数据 if userName is not None and userID is None: userDataUrl = f'https://ch.tetr.io/api/users/{userName}' userData = await request(Url=userDataUrl) @@ -30,28 +32,31 @@ async def getUserData(userName: str = None, userID: str = None) -> dict[str, dic raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误') return userData -# 获取Solo数据 + async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]: + # 获取Solo数据 if userName is not None and userID is None: userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records' - soloData = await request(Url = userSoloUrl) + soloData = await request(Url=userSoloUrl) elif userName is None and userID is not None: userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records' - soloData = await request(Url = userSoloUrl) + soloData = await request(Url=userSoloUrl) else: raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误') return soloData -# 获取用户ID + async def getUserID(userData: dict = None, userName: str = None) -> str: + # 获取用户ID if userName is not None and userData is None: userData = await getUserData(userName=userName) elif userData is None and userName is None: raise ValueError('[TETRIS STATS] IODataProcessing.getUserID: 参数错误') return userData['Data']['data']['user']['_id'] -# 获取排位统计数据 -async def getLeagueStats(userData: dict) -> dict[str, bool|int|str|float]: + +async def getLeagueStats(userData: dict) -> dict[str, bool | int | str | float]: + # 获取排位统计数据 league = userData['Data']['data']['user']['league'] leagueStats = {} if league['gamesplayed'] == 0: @@ -71,36 +76,48 @@ async def getLeagueStats(userData: dict) -> dict[str, bool|int|str|float]: leagueStats['Ranked'] = False leagueStats['Standing'] = league['standing'] leagueStats['LPM'] = round((league['pps'] * 24), 2) - leagueStats['APL'] = round((leagueStats['APM'] / leagueStats['LPM']), 2) + leagueStats['APL'] = round( + (leagueStats['APM'] / leagueStats['LPM']), 2) leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2) - leagueStats['ADPL'] = round((leagueStats['ADPM'] / leagueStats['LPM']), 2) + leagueStats['ADPL'] = round( + (leagueStats['ADPM'] / leagueStats['LPM']), 2) return leagueStats -# 获取40L统计数据 -async def getSprintStats(soloData: dict) -> dict[str, bool|int|float]: + +async def getSprintStats(soloData: dict) -> dict[str, bool | int | float]: + # 获取40L统计数据 sprintStats = {} if soloData['Data']['data']['records']['40l']['record'] is None: sprintStats['Played'] = False else: sprintStats['Played'] = True - sprintStats['Rank'] = False if soloData['Data']['data']['records']['40l']['rank'] is None else soloData['Data']['data']['records']['40l']['rank'] - sprintStats['Time'] = round(soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2) + if soloData['Data']['data']['records']['40l']['rank'] is None: + sprintStats['Rank'] = False + else: + sprintStats['Rank'] = soloData['Data']['data']['records']['40l']['rank'] + sprintStats['Time'] = round( + soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2) return sprintStats -# 获取Blitz统计数据 -async def getBlitzStats(soloData: dict) -> dict[str, bool|int]: + +async def getBlitzStats(soloData: dict) -> dict[str, bool | int]: + # 获取Blitz统计数据 blitzStats = {} if soloData['Data']['data']['records']['blitz']['record'] is None: blitzStats['Played'] = False else: blitzStats['Played'] = True - blitzStats['Rank'] = False if soloData['Data']['data']['records']['blitz']['rank'] is None else soloData['Data']['data']['records']['blitz']['rank'] + if soloData['Data']['data']['records']['blitz']['rank'] is None: + blitzStats['Rank'] = False + else: + blitzStats['Rank'] = soloData['Data']['data']['records']['blitz']['rank'] blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score'] return blitzStats -# 生成消息 + async def generateMessage(userName: str = None, userID: str = None) -> str: - userData, soloData = await gather(getUserData(userName = userName, userID = userID), getSoloData(userName = userName, userID = userID)) + # 生成消息 + userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID)) if userData['Status'] is False: return '用户信息请求失败' if userData['Success'] is False: diff --git a/nonebot_plugin_tetris_stats/MessageAnalyzer.py b/nonebot_plugin_tetris_stats/MessageAnalyzer.py index f95a493..37fe87f 100644 --- a/nonebot_plugin_tetris_stats/MessageAnalyzer.py +++ b/nonebot_plugin_tetris_stats/MessageAnalyzer.py @@ -1,6 +1,6 @@ from re import match -# userBind + async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]: _CMD_ALIASES = {'IO': ['io绑定', 'iobind'], 'TOP': ['top绑定', 'topbind']} @@ -15,7 +15,7 @@ async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str else: return await checkName(message, gameType) -# statsQuery + async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]: _CMD_ALIASES = {'IO': ['io查', 'iostats'], 'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'], @@ -44,6 +44,7 @@ async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool else: return await checkName(message, gameType) + async def checkName(name: str, gameType: str) -> dict[str, bool | str]: if gameType == 'IO': if match(r'^[a-f0-9]{24}$', name): @@ -60,7 +61,7 @@ async def checkName(name: str, gameType: str) -> dict[str, bool | str]: elif gameType == 'TOS': if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name) and name.isdigit() is False - and 2 <= len(name) <= 18): + and 2 <= len(name) <= 18): # 虽然我也不想这么长 但是似乎确实得这么长 return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name} elif name.isdigit() is True: diff --git a/nonebot_plugin_tetris_stats/SQL.py b/nonebot_plugin_tetris_stats/SQL.py index f43eb55..34b172f 100644 --- a/nonebot_plugin_tetris_stats/SQL.py +++ b/nonebot_plugin_tetris_stats/SQL.py @@ -5,8 +5,9 @@ import os _DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db' -# 初始化数据库 + async def initDB(): + # 初始化数据库 if not os.path.exists(os.path.dirname(_DB_FILE)): os.makedirs(os.path.dirname(_DB_FILE)) db = sqlite3.connect(_DB_FILE) @@ -21,8 +22,9 @@ async def initDB(): db.close() logger.info('数据库初始化完成') -# 查询绑定信息 + async def queryBindInfo(QQNumber: int, gameType: str) -> dict: + # 查询绑定信息 db = sqlite3.connect(_DB_FILE) cursor = db.cursor() cursor.execute(f'SELECT USER FROM {gameType}BIND WHERE QQ = {QQNumber}') @@ -34,8 +36,9 @@ async def queryBindInfo(QQNumber: int, gameType: str) -> dict: else: return {'Hit': True, 'User': user[0]} -# 写入绑定信息 + async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str: + # 写入绑定信息 info = await queryBindInfo(QQNumber, gameType) db = sqlite3.connect(_DB_FILE) cursor = db.cursor() diff --git a/nonebot_plugin_tetris_stats/TOSDataProcessing.py b/nonebot_plugin_tetris_stats/TOSDataProcessing.py index 3ad687c..079d5b1 100644 --- a/nonebot_plugin_tetris_stats/TOSDataProcessing.py +++ b/nonebot_plugin_tetris_stats/TOSDataProcessing.py @@ -3,8 +3,9 @@ from nonebot.log import logger from asyncio import gather import aiohttp -# 封装请求函数 -async def request(Url: str) -> dict[str, bool|dict[str, any]]: + +async def request(Url: str) -> dict[str, bool | dict[str, any]]: + # 封装请求函数 data = {} try: async with aiohttp.ClientSession() as session: @@ -18,8 +19,9 @@ async def request(Url: str) -> dict[str, bool|dict[str, any]]: finally: return data -# 获取用户信息 -async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool|dict[str, any]]: + +async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool | dict[str, any]]: + # 获取用户信息 if userName is not None and teaID is None: userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}' userInfo = await request(Url=userDataUrl) @@ -30,8 +32,9 @@ async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误') return userInfo -# 获取用户数据 -async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool|dict[str, any]]: + +async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool | dict[str, any]]: + # 获取用户数据 if userName is not None and teaID is None: userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}' userData = await request(Url=userDataUrl) @@ -42,20 +45,23 @@ async def getUserData(userName: str = None, teaID: int = None, otherParameter: s raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误') return userData -# 获取Rank数据 -async def getRankStats(userInfo: dict) -> dict[str, bool|float]: + +async def getRankStats(userInfo: dict) -> dict[str, bool | float]: + # 获取Rank数据 rankStats = {} if int(userInfo['Data']['data']['rankedGames']) == 0: rankStats['Played'] = False else: rankStats['Played'] = True - rankStats['Rating'] = round(float(userInfo['Data']['data']['ratingNow']), 2) + rankStats['Rating'] = round( + float(userInfo['Data']['data']['ratingNow']), 2) rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2) rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3) return rankStats -# 获取游戏数据 -async def getGameData(userData: dict) -> dict[str, bool|int|float]: + +async def getGameData(userData: dict) -> dict[str, bool | int | float]: + # 获取游戏数据 gameData = {} if userData['Data']['data'] == []: gameData['Played'] = False @@ -92,13 +98,15 @@ async def getGameData(userData: dict) -> dict[str, bool|int|float]: # TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息 return gameData -# 获取PB数据 -async def getPBData(userInfo: dict) -> dict[str, bool|float|str]: + +async def getPBData(userInfo: dict) -> dict[str, bool | float | str]: + # 获取PB数据 PBData = {} if int(userInfo['Data']['data']['PBSprint']) == 2147483647: PBData['Sprint'] = False else: - PBData['Sprint'] = round(float(userInfo['Data']['data']['PBSprint']) / 1000, 2) + PBData['Sprint'] = round( + float(userInfo['Data']['data']['PBSprint']) / 1000, 2) if int(userInfo['Data']['data']['PBMarathon']) == 0: PBData['Marathon'] = False else: @@ -109,8 +117,9 @@ async def getPBData(userInfo: dict) -> dict[str, bool|float|str]: PBData['Challenge'] = userInfo['Data']['data']['PBChallenge'] return PBData -# 生成消息 + async def generateMessage(userName: str = None, teaID: int = None) -> str: + # 生成消息 userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID)) if userInfo['Status'] is False: return f'用户信息请求失败' @@ -136,7 +145,7 @@ async def generateMessage(userName: str = None, teaID: int = None) -> str: message += f'\nADPM:{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )' if PBData['Sprint'] is not False: message += f'\n40L: {PBData["Sprint"]}s' - if PBData['Marathon']is not False: + if PBData['Marathon'] is not False: message += f'\nMarathon: {PBData["Marathon"]}' if PBData['Challenge'] is not False: message += f'\nChallenge: {PBData["Challenge"]}' diff --git a/nonebot_plugin_tetris_stats/__init__.py b/nonebot_plugin_tetris_stats/__init__.py index 0c387b4..4a82aa2 100644 --- a/nonebot_plugin_tetris_stats/__init__.py +++ b/nonebot_plugin_tetris_stats/__init__.py @@ -13,6 +13,7 @@ from .TOSDataProcessing import generateMessage as TOSgenerateMessage driver = get_driver() + @driver.on_startup async def startUP(): await initDB() @@ -26,6 +27,7 @@ tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats', topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP) topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP) + @ioBind.handle() async def bindIOUser(event: MessageEvent, matcher: Matcher): decodedMessage = await handleBindMessage(message=str(event.get_message()), gameType='IO') @@ -39,6 +41,7 @@ async def bindIOUser(event: MessageEvent, matcher: Matcher): message = decodedMessage['Message'] await matcher.send(message=message) + @ioStats.handle() async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher): decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='IO') @@ -63,6 +66,7 @@ async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher): message = decodedMessage['Message'] await matcher.finish(message=message) + @tosStats.handle() async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher): decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='TOS') @@ -77,10 +81,12 @@ async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher): message = decodedMessage['Message'] await matcher.finish(message=message) + @topBind.handle() async def bindTOPUser(event: MessageEvent, matcher: Matcher): await matcher.send(message='TODO') + @topStats.handle() async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher): await matcher.send(message='TODO')