first commit

This commit is contained in:
2022-05-24 10:38:31 +08:00
commit c7b9716a70
7 changed files with 509 additions and 0 deletions

View File

@@ -0,0 +1,137 @@
from nonebot.log import logger
from asyncio import gather
import aiohttp
# 封装请求函数
async def request(Url: str) -> dict[str, bool|dict[str, any]]:
data = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data['Status'] = True
data['Data'] = await resp.json()
data['Success'] = data['Data']['success']
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] IODataProcessing.request: 请求错误\n{e}')
data['Status'] = False
finally:
return data
# 获取用户数据
async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
if userName is not None and userID is None:
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
userData = await request(Url=userDataUrl)
elif userName is None and userID is not None:
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
userData = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误')
return userData
# 获取Solo数据
async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
if userName is not None and userID is None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
soloData = await request(Url = userSoloUrl)
elif userName is None and userID is not None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
soloData = await request(Url = userSoloUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误')
return soloData
# 获取用户ID
async def getUserID(userData: dict = None, userName: str = None) -> str:
if userName is not None and userData is None:
userData = await getUserData(userName=userName)
elif userData is None and userName is None:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserID: 参数错误')
return userData['Data']['data']['user']['_id']
# 获取排位统计数据
async def getLeagueStats(userData: dict) -> dict[str, bool|int|str|float]:
league = userData['Data']['data']['user']['league']
leagueStats = {}
if league['gamesplayed'] == 0:
leagueStats['Played'] = False
else:
leagueStats['Played'] = True
leagueStats['PPS'] = league['pps']
leagueStats['APM'] = league['apm']
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] != -1:
leagueStats['Ranked'] = True
leagueStats['Rating'] = round(league['rating'], 2)
leagueStats['Glicko'] = round(league['glicko'], 2)
leagueStats['RD'] = round(league['rd'], 2)
else:
leagueStats['Ranked'] = False
leagueStats['Standing'] = league['standing']
leagueStats['LPM'] = round((league['pps'] * 24), 2)
leagueStats['APL'] = round((leagueStats['APM'] / leagueStats['LPM']), 2)
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
leagueStats['ADPL'] = round((leagueStats['ADPM'] / leagueStats['LPM']), 2)
return leagueStats
# 获取40L统计数据
async def getSprintStats(soloData: dict) -> dict[str, bool|int|float]:
sprintStats = {}
if soloData['Data']['data']['records']['40l']['record'] is None:
sprintStats['Played'] = False
else:
sprintStats['Played'] = True
sprintStats['Rank'] = False if soloData['Data']['data']['records']['40l']['rank'] is None else soloData['Data']['data']['records']['40l']['rank']
sprintStats['Time'] = round(soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
return sprintStats
# 获取Blitz统计数据
async def getBlitzStats(soloData: dict) -> dict[str, bool|int]:
blitzStats = {}
if soloData['Data']['data']['records']['blitz']['record'] is None:
blitzStats['Played'] = False
else:
blitzStats['Played'] = True
blitzStats['Rank'] = False if soloData['Data']['data']['records']['blitz']['rank'] is None else soloData['Data']['data']['records']['blitz']['rank']
blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score']
return blitzStats
# 生成消息
async def generateMessage(userName: str = None, userID: str = None) -> str:
userData, soloData = await gather(getUserData(userName = userName, userID = userID), getSoloData(userName = userName, userID = userID))
if userData['Status'] is False:
return '用户信息请求失败'
if userData['Success'] is False:
return f'用户信息请求错误:\n{userData["Data"]["error"]}'
userName = userData['Data']['data']['user']['username'].upper()
leagueStats = await getLeagueStats(userData)
message = ''
if leagueStats['Played'] is False:
message += f'用户 {userName} 没有排位统计数据'
else:
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
message += f'用户 {userName} 暂未完成定级赛'
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
else:
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
if leagueStats["VS"] != 0:
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
if soloData['Status'] is False:
return f'{message}\nSolo统计数据请求失败'
sprintStats = await getSprintStats(soloData)
blitzStats = await getBlitzStats(soloData)
if sprintStats['Played'] is True:
message += f'\n40L: {sprintStats["Time"]}s'
if sprintStats['Rank'] is not False:
message += f' ( #{sprintStats["Rank"]} )'
if blitzStats['Played'] is True:
message += f'\nBlitz: {blitzStats["Score"]}'
if blitzStats['Rank'] is not False:
message += f' ( #{blitzStats["Rank"]} )'
return message

View File

@@ -0,0 +1,69 @@
from re import match
# userBind
async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
return await checkName(message, gameType)
# statsQuery
async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_ME = ['', '自己', '', '', 'me']
message = (message.strip()).lower()
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
if message.startswith('[cq:at,qq='):
try:
QQNumber = int((str(message)).split(
'[cq:at,qq=')[1].split(']')[0])
except ValueError:
return {'Success': False, 'Type': None, 'Message': 'QQ号码不合法'}
else:
return {'Success': True, 'Type': 'AT', 'Message': None, 'QQNumber': QQNumber}
elif message in _ME:
return {'Success': True, 'Type': 'ME', 'Message': None}
else:
return await checkName(message, gameType)
async def checkName(name: str, gameType: str) -> dict[str, bool | str]:
if gameType == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return {'Success': True, 'Type': 'ID', 'Message': None, 'User': name}
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
elif name.isdigit() is True:
return {'Success': True, 'Type': 'QQ', 'Message': None, 'QQNumber': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}

View File

@@ -0,0 +1,52 @@
from nonebot.log import logger
import sqlite3
import os
_DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db'
# 初始化数据库
async def initDB():
if not os.path.exists(os.path.dirname(_DB_FILE)):
os.makedirs(os.path.dirname(_DB_FILE))
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
db.commit()
db.close()
logger.info('数据库初始化完成')
# 查询绑定信息
async def queryBindInfo(QQNumber: int, gameType: str) -> dict:
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
cursor.execute(f'SELECT USER FROM {gameType}BIND WHERE QQ = {QQNumber}')
user = cursor.fetchone()
db.commit()
db.close()
if user is None:
return {'Hit': False, 'User': None}
else:
return {'Hit': True, 'User': user[0]}
# 写入绑定信息
async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str:
info = await queryBindInfo(QQNumber, gameType)
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
if info['Hit'] is True:
cursor.execute(
f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber))
message = '更新成功'
elif info['Hit'] is False:
cursor.execute(
f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user))
message = '绑定成功'
db.commit()
db.close()
return message

View File

@@ -0,0 +1,143 @@
from nonebot.log import logger
from asyncio import gather
import aiohttp
# 封装请求函数
async def request(Url: str) -> dict[str, bool|dict[str, any]]:
data = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data['Status'] = True
data['Data'] = await resp.json()
data['Success'] = data['Data']['success']
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] TOSDataProcessing.request: 请求错误\n{e}')
data['Status'] = False
finally:
return data
# 获取用户信息
async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool|dict[str, any]]:
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
userInfo = await request(Url=userDataUrl)
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
userInfo = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误')
return userInfo
# 获取用户数据
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool|dict[str, any]]:
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
userData = await request(Url=userDataUrl)
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
userData = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误')
return userData
# 获取Rank数据
async def getRankStats(userInfo: dict) -> dict[str, bool|float]:
rankStats = {}
if int(userInfo['Data']['data']['rankedGames']) == 0:
rankStats['Played'] = False
else:
rankStats['Played'] = True
rankStats['Rating'] = round(float(userInfo['Data']['data']['ratingNow']), 2)
rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2)
rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3)
return rankStats
# 获取游戏数据
async def getGameData(userData: dict) -> dict[str, bool|int|float]:
gameData = {}
if userData['Data']['data'] == []:
gameData['Played'] = False
else:
gameData['Played'] = True
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
for i in userData['Data']['data']:
# 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0:
continue
# 茶不计算没挖掘的局即使apm和lpm也如此
if i['dig'] is None:
break
# 加权计算
time = i['time'] / 1000
lpm = 24 * (i['pieces'] / time)
apm = (i['attack'] / time) * 60
adpm = ((i['attack'] + i['dig']) / time) * 60
weightedTotalLpm += lpm * time
weightedTotalApm += apm * time
weightedTotalAdpm += adpm * time
weightedTotalTime += time
num += 1
if num == 50:
break
gameData['NUM'] = num
gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2)
gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2)
gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2)
gameData['PPS'] = round((gameData['LPM'] / 24), 2)
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息
return gameData
# 获取PB数据
async def getPBData(userInfo: dict) -> dict[str, bool|float|str]:
PBData = {}
if int(userInfo['Data']['data']['PBSprint']) == 2147483647:
PBData['Sprint'] = False
else:
PBData['Sprint'] = round(float(userInfo['Data']['data']['PBSprint']) / 1000, 2)
if int(userInfo['Data']['data']['PBMarathon']) == 0:
PBData['Marathon'] = False
else:
PBData['Marathon'] = userInfo['Data']['data']['PBMarathon']
if int(userInfo['Data']['data']['PBChallenge']) == 0:
PBData['Challenge'] = False
else:
PBData['Challenge'] = userInfo['Data']['data']['PBChallenge']
return PBData
# 生成消息
async def generateMessage(userName: str = None, teaID: int = None) -> str:
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
if userInfo['Status'] is False:
return f'用户信息请求失败'
if userInfo['Success'] is False:
return f'用户信息请求错误:\n{userInfo["Data"]["error"]}'
rankStats = await getRankStats(userInfo)
PBData = await getPBData(userInfo)
message = ''
if rankStats['Played'] is False:
message += f'用户 {userInfo["Data"]["data"]["name"]}{userInfo["Data"]["data"]["teaId"]})暂无段位统计数据'
elif rankStats['Played'] is True:
message += f'用户 {userInfo["Data"]["data"]["name"]} ({userInfo["Data"]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
if userData['Status'] is False:
message = f'{message.rstrip()}\n游戏数据请求失败'
elif userData['Status'] is True:
gameData = await getGameData(userData)
if gameData['Played'] is False:
message += ', 暂无游戏数据'
elif gameData['Played'] is True:
message += f', 最近 {gameData["NUM"]} 局数据'
message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )'
message += f'\nAPM{gameData["APM"]} ( x{gameData["APL"]} )'
message += f'\nADPM{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )'
if PBData['Sprint'] is not False:
message += f'\n40L: {PBData["Sprint"]}s'
if PBData['Marathon']is not False:
message += f'\nMarathon: {PBData["Marathon"]}'
if PBData['Challenge'] is not False:
message += f'\nChallenge: {PBData["Challenge"]}'
return message

View File

@@ -0,0 +1,86 @@
from re import I
from nonebot import get_driver, on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from .MessageAnalyzer import *
from .SQL import *
from .IODataProcessing import generateMessage as IOgenerateMessage
from .IODataProcessing import getUserID as IOgetUserID
from .TOSDataProcessing import generateMessage as TOSgenerateMessage
driver = get_driver()
@driver.on_startup
async def startUP():
await initDB()
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
@ioBind.handle()
async def bindIOUser(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleBindMessage(message=str(event.get_message()), gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'ID':
user = decodedMessage['User']
elif decodedMessage['Type'] == 'Name':
user = await IOgetUserID(userName=decodedMessage['User'])
message = await writeBindInfo(QQNumber=event.sender.user_id, user=user, gameType='IO')
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.send(message=message)
@ioStats.handle()
async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT':
bindInfo = await queryBindInfo(QQNumber=decodedMessage['QQNumber'], gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '未查询到绑定信息'
elif decodedMessage['Type'] == 'ME':
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '您还没有绑定账号'
elif decodedMessage['Type'] == 'ID':
message = await IOgenerateMessage(userID=decodedMessage['User'])
elif decodedMessage['Type'] == 'Name':
message = await IOgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@tosStats.handle()
async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='TOS')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT' or decodedMessage['Type'] == 'QQ':
message = await TOSgenerateMessage(teaID=decodedMessage['QQNumber'])
elif decodedMessage['Type'] == 'ME':
message = await TOSgenerateMessage(teaID=event.sender.user_id)
elif decodedMessage['Type'] == 'Name':
message = await TOSgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@topBind.handle()
async def bindTOPUser(event: MessageEvent, matcher: Matcher):
await matcher.send(message='TODO')
@topStats.handle()
async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher):
await matcher.send(message='TODO')