commit c7b9716a70e7e4a2c9837b2e58bfdef866fcc97f Author: scdhh Date: Tue May 24 10:38:31 2022 +0800 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..445f767 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +/test.py +/Untitled-1.json +/Untitled-2.html +/Untitled-2.json +/Untitled-3.json +/Untitled-4.html \ No newline at end of file diff --git a/nonebot_plugin_tetris_stats/IODataProcessing.py b/nonebot_plugin_tetris_stats/IODataProcessing.py new file mode 100644 index 0000000..8abf102 --- /dev/null +++ b/nonebot_plugin_tetris_stats/IODataProcessing.py @@ -0,0 +1,137 @@ +from nonebot.log import logger + +from asyncio import gather +import aiohttp + +# 封装请求函数 +async def request(Url: str) -> dict[str, bool|dict[str, any]]: + data = {} + try: + async with aiohttp.ClientSession() as session: + async with session.get(Url) as resp: + data['Status'] = True + data['Data'] = await resp.json() + data['Success'] = data['Data']['success'] + except aiohttp.client_exceptions.ClientConnectorError as e: + logger.error(f'[TETRIS STATS] IODataProcessing.request: 请求错误\n{e}') + data['Status'] = False + finally: + return data + +# 获取用户数据 +async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]: + if userName is not None and userID is None: + userDataUrl = f'https://ch.tetr.io/api/users/{userName}' + userData = await request(Url=userDataUrl) + elif userName is None and userID is not None: + userDataUrl = f'https://ch.tetr.io/api/users/{userID}' + userData = await request(Url=userDataUrl) + else: + raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误') + return userData + +# 获取Solo数据 +async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]: + if userName is not None and userID is None: + userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records' + soloData = await request(Url = userSoloUrl) + elif userName is None and userID is not None: + userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records' + soloData = await request(Url = userSoloUrl) + else: + raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误') + return soloData + +# 获取用户ID +async def getUserID(userData: dict = None, userName: str = None) -> str: + if userName is not None and userData is None: + userData = await getUserData(userName=userName) + elif userData is None and userName is None: + raise ValueError('[TETRIS STATS] IODataProcessing.getUserID: 参数错误') + return userData['Data']['data']['user']['_id'] + +# 获取排位统计数据 +async def getLeagueStats(userData: dict) -> dict[str, bool|int|str|float]: + league = userData['Data']['data']['user']['league'] + leagueStats = {} + if league['gamesplayed'] == 0: + leagueStats['Played'] = False + else: + leagueStats['Played'] = True + leagueStats['PPS'] = league['pps'] + leagueStats['APM'] = league['apm'] + leagueStats['VS'] = 0 if league['vs'] is None else league['vs'] + leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper() + if league['rating'] != -1: + leagueStats['Ranked'] = True + leagueStats['Rating'] = round(league['rating'], 2) + leagueStats['Glicko'] = round(league['glicko'], 2) + leagueStats['RD'] = round(league['rd'], 2) + else: + leagueStats['Ranked'] = False + leagueStats['Standing'] = league['standing'] + leagueStats['LPM'] = round((league['pps'] * 24), 2) + leagueStats['APL'] = round((leagueStats['APM'] / leagueStats['LPM']), 2) + leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2) + leagueStats['ADPL'] = round((leagueStats['ADPM'] / leagueStats['LPM']), 2) + return leagueStats + +# 获取40L统计数据 +async def getSprintStats(soloData: dict) -> dict[str, bool|int|float]: + sprintStats = {} + if soloData['Data']['data']['records']['40l']['record'] is None: + sprintStats['Played'] = False + else: + sprintStats['Played'] = True + sprintStats['Rank'] = False if soloData['Data']['data']['records']['40l']['rank'] is None else soloData['Data']['data']['records']['40l']['rank'] + sprintStats['Time'] = round(soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2) + return sprintStats + +# 获取Blitz统计数据 +async def getBlitzStats(soloData: dict) -> dict[str, bool|int]: + blitzStats = {} + if soloData['Data']['data']['records']['blitz']['record'] is None: + blitzStats['Played'] = False + else: + blitzStats['Played'] = True + blitzStats['Rank'] = False if soloData['Data']['data']['records']['blitz']['rank'] is None else soloData['Data']['data']['records']['blitz']['rank'] + blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score'] + return blitzStats + +# 生成消息 +async def generateMessage(userName: str = None, userID: str = None) -> str: + userData, soloData = await gather(getUserData(userName = userName, userID = userID), getSoloData(userName = userName, userID = userID)) + if userData['Status'] is False: + return '用户信息请求失败' + if userData['Success'] is False: + return f'用户信息请求错误:\n{userData["Data"]["error"]}' + userName = userData['Data']['data']['user']['username'].upper() + leagueStats = await getLeagueStats(userData) + message = '' + if leagueStats['Played'] is False: + message += f'用户 {userName} 没有排位统计数据' + else: + if leagueStats['Rank'] is False and leagueStats['Ranked'] is False: + message += f'用户 {userName} 暂未完成定级赛' + elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True: + message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR' + else: + message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})' + message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:' + message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )' + message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )' + if leagueStats["VS"] != 0: + message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )' + if soloData['Status'] is False: + return f'{message}\nSolo统计数据请求失败' + sprintStats = await getSprintStats(soloData) + blitzStats = await getBlitzStats(soloData) + if sprintStats['Played'] is True: + message += f'\n40L: {sprintStats["Time"]}s' + if sprintStats['Rank'] is not False: + message += f' ( #{sprintStats["Rank"]} )' + if blitzStats['Played'] is True: + message += f'\nBlitz: {blitzStats["Score"]}' + if blitzStats['Rank'] is not False: + message += f' ( #{blitzStats["Rank"]} )' + return message diff --git a/nonebot_plugin_tetris_stats/MessageAnalyzer.py b/nonebot_plugin_tetris_stats/MessageAnalyzer.py new file mode 100644 index 0000000..f95a493 --- /dev/null +++ b/nonebot_plugin_tetris_stats/MessageAnalyzer.py @@ -0,0 +1,69 @@ +from re import match + +# userBind +async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]: + _CMD_ALIASES = {'IO': ['io绑定', 'iobind'], + 'TOP': ['top绑定', 'topbind']} + # 剔除命令前缀 + for i in _CMD_ALIASES[gameType]: + if message.startswith(i): + message = message.replace(i, '') + message = message.strip() + break + if message == '' or message.isspace(): + return {'Success': False, 'Type': None, 'Message': '用户名为空'} + else: + return await checkName(message, gameType) + +# statsQuery +async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]: + _CMD_ALIASES = {'IO': ['io查', 'iostats'], + 'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'], + 'TOP': ['top查', 'topstats']} + _ME = ['我', '自己', '私', '俺', 'me'] + message = (message.strip()).lower() + # 剔除命令前缀 + for i in _CMD_ALIASES[gameType]: + if message.startswith(i): + message = message.replace(i, '') + message = message.strip() + break + if message == '' or message.isspace(): + return {'Success': False, 'Type': None, 'Message': '用户名为空'} + else: + if message.startswith('[cq:at,qq='): + try: + QQNumber = int((str(message)).split( + '[cq:at,qq=')[1].split(']')[0]) + except ValueError: + return {'Success': False, 'Type': None, 'Message': 'QQ号码不合法'} + else: + return {'Success': True, 'Type': 'AT', 'Message': None, 'QQNumber': QQNumber} + elif message in _ME: + return {'Success': True, 'Type': 'ME', 'Message': None} + else: + return await checkName(message, gameType) + +async def checkName(name: str, gameType: str) -> dict[str, bool | str]: + if gameType == 'IO': + if match(r'^[a-f0-9]{24}$', name): + return {'Success': True, 'Type': 'ID', 'Message': None, 'User': name} + elif match(r'^[a-zA-Z0-9_-]{3,16}$', name): + return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name} + else: + return {'Success': False, 'Type': None, 'Message': '用户名不合法'} + elif gameType == 'TOP': + if match(r'^[a-zA-Z0-9_]{1,16}$', name): + return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name} + else: + return {'Success': False, 'Type': None, 'Message': '用户名不合法'} + elif gameType == 'TOS': + if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name) + and name.isdigit() is False + and 2 <= len(name) <= 18): + # 虽然我也不想这么长 但是似乎确实得这么长 + return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name} + elif name.isdigit() is True: + return {'Success': True, 'Type': 'QQ', 'Message': None, 'QQNumber': name} + else: + return {'Success': False, 'Type': None, 'Message': '用户名不合法'} diff --git a/nonebot_plugin_tetris_stats/SQL.py b/nonebot_plugin_tetris_stats/SQL.py new file mode 100644 index 0000000..f43eb55 --- /dev/null +++ b/nonebot_plugin_tetris_stats/SQL.py @@ -0,0 +1,52 @@ +from nonebot.log import logger + +import sqlite3 +import os + +_DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db' + +# 初始化数据库 +async def initDB(): + if not os.path.exists(os.path.dirname(_DB_FILE)): + os.makedirs(os.path.dirname(_DB_FILE)) + db = sqlite3.connect(_DB_FILE) + cursor = db.cursor() + cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND + (QQ INTEGER NOT NULL, + USER TEXT NOT NULL)''') + cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND + (QQ INTEGER NOT NULL, + USER TEXT NOT NULL)''') + db.commit() + db.close() + logger.info('数据库初始化完成') + +# 查询绑定信息 +async def queryBindInfo(QQNumber: int, gameType: str) -> dict: + db = sqlite3.connect(_DB_FILE) + cursor = db.cursor() + cursor.execute(f'SELECT USER FROM {gameType}BIND WHERE QQ = {QQNumber}') + user = cursor.fetchone() + db.commit() + db.close() + if user is None: + return {'Hit': False, 'User': None} + else: + return {'Hit': True, 'User': user[0]} + +# 写入绑定信息 +async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str: + info = await queryBindInfo(QQNumber, gameType) + db = sqlite3.connect(_DB_FILE) + cursor = db.cursor() + if info['Hit'] is True: + cursor.execute( + f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber)) + message = '更新成功' + elif info['Hit'] is False: + cursor.execute( + f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user)) + message = '绑定成功' + db.commit() + db.close() + return message diff --git a/nonebot_plugin_tetris_stats/TOSDataProcessing.py b/nonebot_plugin_tetris_stats/TOSDataProcessing.py new file mode 100644 index 0000000..3ad687c --- /dev/null +++ b/nonebot_plugin_tetris_stats/TOSDataProcessing.py @@ -0,0 +1,143 @@ +from nonebot.log import logger + +from asyncio import gather +import aiohttp + +# 封装请求函数 +async def request(Url: str) -> dict[str, bool|dict[str, any]]: + data = {} + try: + async with aiohttp.ClientSession() as session: + async with session.get(Url) as resp: + data['Status'] = True + data['Data'] = await resp.json() + data['Success'] = data['Data']['success'] + except aiohttp.client_exceptions.ClientConnectorError as e: + logger.error(f'[TETRIS STATS] TOSDataProcessing.request: 请求错误\n{e}') + data['Status'] = False + finally: + return data + +# 获取用户信息 +async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool|dict[str, any]]: + if userName is not None and teaID is None: + userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}' + userInfo = await request(Url=userDataUrl) + elif teaID is not None and userName is None: + userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}' + userInfo = await request(Url=userDataUrl) + else: + raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误') + return userInfo + +# 获取用户数据 +async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool|dict[str, any]]: + if userName is not None and teaID is None: + userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}' + userData = await request(Url=userDataUrl) + elif teaID is not None and userName is None: + userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}' + userData = await request(Url=userDataUrl) + else: + raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误') + return userData + +# 获取Rank数据 +async def getRankStats(userInfo: dict) -> dict[str, bool|float]: + rankStats = {} + if int(userInfo['Data']['data']['rankedGames']) == 0: + rankStats['Played'] = False + else: + rankStats['Played'] = True + rankStats['Rating'] = round(float(userInfo['Data']['data']['ratingNow']), 2) + rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2) + rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3) + return rankStats + +# 获取游戏数据 +async def getGameData(userData: dict) -> dict[str, bool|int|float]: + gameData = {} + if userData['Data']['data'] == []: + gameData['Played'] = False + else: + gameData['Played'] = True + weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0 + for i in userData['Data']['data']: + # 排除单人局和时间为0的游戏 + if i['num_players'] == 1 or i['time'] == 0: + continue + # 茶:不计算没挖掘的局,即使apm和lpm也如此 + if i['dig'] is None: + break + # 加权计算 + time = i['time'] / 1000 + lpm = 24 * (i['pieces'] / time) + apm = (i['attack'] / time) * 60 + adpm = ((i['attack'] + i['dig']) / time) * 60 + weightedTotalLpm += lpm * time + weightedTotalApm += apm * time + weightedTotalAdpm += adpm * time + weightedTotalTime += time + num += 1 + if num == 50: + break + gameData['NUM'] = num + gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2) + gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2) + gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2) + gameData['PPS'] = round((gameData['LPM'] / 24), 2) + gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2) + gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2) + gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2) + # TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息 + return gameData + +# 获取PB数据 +async def getPBData(userInfo: dict) -> dict[str, bool|float|str]: + PBData = {} + if int(userInfo['Data']['data']['PBSprint']) == 2147483647: + PBData['Sprint'] = False + else: + PBData['Sprint'] = round(float(userInfo['Data']['data']['PBSprint']) / 1000, 2) + if int(userInfo['Data']['data']['PBMarathon']) == 0: + PBData['Marathon'] = False + else: + PBData['Marathon'] = userInfo['Data']['data']['PBMarathon'] + if int(userInfo['Data']['data']['PBChallenge']) == 0: + PBData['Challenge'] = False + else: + PBData['Challenge'] = userInfo['Data']['data']['PBChallenge'] + return PBData + +# 生成消息 +async def generateMessage(userName: str = None, teaID: int = None) -> str: + userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID)) + if userInfo['Status'] is False: + return f'用户信息请求失败' + if userInfo['Success'] is False: + return f'用户信息请求错误:\n{userInfo["Data"]["error"]}' + rankStats = await getRankStats(userInfo) + PBData = await getPBData(userInfo) + message = '' + if rankStats['Played'] is False: + message += f'用户 {userInfo["Data"]["data"]["name"]}({userInfo["Data"]["data"]["teaId"]})暂无段位统计数据' + elif rankStats['Played'] is True: + message += f'用户 {userInfo["Data"]["data"]["name"]} ({userInfo["Data"]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) ' + if userData['Status'] is False: + message = f'{message.rstrip()}\n游戏数据请求失败' + elif userData['Status'] is True: + gameData = await getGameData(userData) + if gameData['Played'] is False: + message += ', 暂无游戏数据' + elif gameData['Played'] is True: + message += f', 最近 {gameData["NUM"]} 局数据' + message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )' + message += f'\nAPM:{gameData["APM"]} ( x{gameData["APL"]} )' + message += f'\nADPM:{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )' + if PBData['Sprint'] is not False: + message += f'\n40L: {PBData["Sprint"]}s' + if PBData['Marathon']is not False: + message += f'\nMarathon: {PBData["Marathon"]}' + if PBData['Challenge'] is not False: + message += f'\nChallenge: {PBData["Challenge"]}' + return message diff --git a/nonebot_plugin_tetris_stats/__init__.py b/nonebot_plugin_tetris_stats/__init__.py new file mode 100644 index 0000000..0c387b4 --- /dev/null +++ b/nonebot_plugin_tetris_stats/__init__.py @@ -0,0 +1,86 @@ +from re import I + +from nonebot import get_driver, on_regex +from nonebot.adapters.onebot.v11 import GROUP, MessageEvent +from nonebot.matcher import Matcher + +from .MessageAnalyzer import * +from .SQL import * + +from .IODataProcessing import generateMessage as IOgenerateMessage +from .IODataProcessing import getUserID as IOgetUserID +from .TOSDataProcessing import generateMessage as TOSgenerateMessage + +driver = get_driver() + +@driver.on_startup +async def startUP(): + await initDB() + +ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP) +ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP) + +tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats', + flags=I, permission=GROUP) + +topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP) +topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP) + +@ioBind.handle() +async def bindIOUser(event: MessageEvent, matcher: Matcher): + decodedMessage = await handleBindMessage(message=str(event.get_message()), gameType='IO') + if decodedMessage['Success'] is True: + if decodedMessage['Type'] == 'ID': + user = decodedMessage['User'] + elif decodedMessage['Type'] == 'Name': + user = await IOgetUserID(userName=decodedMessage['User']) + message = await writeBindInfo(QQNumber=event.sender.user_id, user=user, gameType='IO') + elif decodedMessage['Success'] is False: + message = decodedMessage['Message'] + await matcher.send(message=message) + +@ioStats.handle() +async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher): + decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='IO') + if decodedMessage['Success'] is True: + if decodedMessage['Type'] == 'AT': + bindInfo = await queryBindInfo(QQNumber=decodedMessage['QQNumber'], gameType='IO') + if bindInfo['Hit'] is True: + message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}') + elif bindInfo['Hit'] is False: + message = '未查询到绑定信息' + elif decodedMessage['Type'] == 'ME': + bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO') + if bindInfo['Hit'] is True: + message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}') + elif bindInfo['Hit'] is False: + message = '您还没有绑定账号' + elif decodedMessage['Type'] == 'ID': + message = await IOgenerateMessage(userID=decodedMessage['User']) + elif decodedMessage['Type'] == 'Name': + message = await IOgenerateMessage(userName=decodedMessage['User']) + elif decodedMessage['Success'] is False: + message = decodedMessage['Message'] + await matcher.finish(message=message) + +@tosStats.handle() +async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher): + decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='TOS') + if decodedMessage['Success'] is True: + if decodedMessage['Type'] == 'AT' or decodedMessage['Type'] == 'QQ': + message = await TOSgenerateMessage(teaID=decodedMessage['QQNumber']) + elif decodedMessage['Type'] == 'ME': + message = await TOSgenerateMessage(teaID=event.sender.user_id) + elif decodedMessage['Type'] == 'Name': + message = await TOSgenerateMessage(userName=decodedMessage['User']) + elif decodedMessage['Success'] is False: + message = decodedMessage['Message'] + await matcher.finish(message=message) + +@topBind.handle() +async def bindTOPUser(event: MessageEvent, matcher: Matcher): + await matcher.send(message='TODO') + +@topStats.handle() +async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher): + await matcher.send(message='TODO') diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4d84867 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[tool.poetry] +name = "nonebot-plugin-tetris-stats" +version = "0.1.0" +description = "" +authors = ["scdhh "] +license = "MIT" + +[tool.poetry.dependencies] +python = "^3.10" +nonebot2 = "^2.0.0-beta.3" + +[tool.poetry.dev-dependencies] + +[build-system] +requires = ["poetry-core>=1.0.0"] +build-backend = "poetry.core.masonry.api"