Compare commits

...

12 Commits
0.1.1 ... 0.2.0

Author SHA1 Message Date
8ed1c20feb 修复tos查询游戏数据时没有有效记录会爆炸的bug 2022-07-25 04:11:42 +08:00
e5abc99590 tos增加一种命令前缀 2022-07-25 04:10:23 +08:00
d6449ddf8c 删除不需要的import 2022-07-24 17:35:41 +08:00
dcbab47833 修复命令前缀大写不能正确识别的bug 2022-07-24 17:25:06 +08:00
scdhh
e65c50e107 Update README.md 2022-07-24 17:25:06 +08:00
4525b84fc4 修复查AT时不能正确识别的bug 2022-07-24 17:25:06 +08:00
6207cdb3d9 修复IO输入用户名时大写查询爆炸的bug 2022-07-24 17:25:06 +08:00
f1291a9923 修复所有type hint(当时写的时候mypy炸了
使用神秘新架构
独立request模块
版本推进
fixed #1 [BUG] 茶服无法查询用户名内带有大写字母的用户
2022-07-24 17:24:37 +08:00
scdhh
0d2b37e78a Create codeql-analysis.yml 2022-05-26 07:38:53 +08:00
scdhh
a8998b01a7 Create LICENSE 2022-05-26 07:35:48 +08:00
df6cb71e3f 添加了对查bot自身的情况的特判 2022-05-25 18:00:09 +08:00
2a19b9fe4c 修复IO绑定用户时 用户不存在导致报错和错误绑定的bug 2022-05-25 14:27:27 +08:00
16 changed files with 594 additions and 469 deletions

72
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@@ -0,0 +1,72 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '17 6 * * 5'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'python' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
steps:
- name: Checkout repository
uses: actions/checkout@v3
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

3
.gitignore vendored
View File

@@ -1 +1,4 @@
dist
test*
Untitled*
*copy*

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 scdhh
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -12,10 +12,17 @@ TETRIS Stats
安装
----
* 使用 pip
* 使用 nb-cli推荐
```
nb plugin install nonebot-plugin-tetris-stats
```
* 使用 pip不推荐
```
pip install nonebot-plugin-tetris-stats
# 修改bot.py
```
使用

View File

@@ -0,0 +1,211 @@
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from typing import Any, Mapping
from asyncio import gather
from re import I
from ..Utils.Request import request
from ..Utils.MessageAnalyzer import handleBindMessage, handleStatsQueryMessage
from ..Utils.SQL import queryBindInfo, writeBindInfo
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
@ioBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleBindMessage(message=event.raw_message, gameType='IO')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
if decodedMessage[0] == 'ID':
userIDStats = await checkUserID(userID=decodedMessage[1][1])
if userIDStats[0] is False:
await matcher.finish(userIDStats[1])
else:
userID = decodedMessage[1][1]
elif decodedMessage[0] == 'Name':
userData = await getUserData(userName=decodedMessage[1][1])
if userData[0] is False:
await matcher.finish('用户信息请求失败')
elif userData[1] is False:
await matcher.finish(f'用户信息请求错误:\n{userData[2]["error"]}')
else:
userID = await getUserID(userData=userData[2])
if event.sender.user_id is None: # 理论上是不会有None出现的ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(await writeBindInfo(QQNumber=event.sender.user_id, user=userID, gameType='IO'))
@ioStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='IO')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
elif decodedMessage[0] == 'AT':
if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息')
bindInfo = await queryBindInfo(QQNumber=decodedMessage[1][1], gameType='IO')
if bindInfo is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
elif decodedMessage[0] == 'ME':
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败请联系bot主人')
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
if bindInfo is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
elif decodedMessage[0] == 'ID':
message = await generateMessage(userID=decodedMessage[1][1])
elif decodedMessage[0] == 'Name':
message = await generateMessage(userName=decodedMessage[1][1])
await matcher.finish(message=message)
async def getUserData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取用户数据
if userName is not None and userID is None:
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
elif userName is None and userID is not None:
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.getUserData: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getSoloData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取Solo数据
if userName is not None and userID is None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
elif userName is None and userID is not None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.getSoloData: 预期外行为请上报GitHub')
return await request(Url=userSoloUrl)
async def getUserID(userData: dict) -> str:
return userData['data']['user']['_id']
async def checkUserID(userID: str) -> tuple[bool, str]:
userData = await getUserData(userID=userID)
if userData[0] is False:
return (False, '用户信息请求失败')
elif userData[1] is False:
return (False, f'用户信息请求错误:\n{userData[2]["error"]}')
elif userID == userData[2]['data']['user']['_id']:
return (True, '')
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.checkUserID: 服务器返回的userID和用户提供的不一致这种情况理论上不应该发生以防万一还是写一下x')
async def getLeagueStats(userData: dict) -> dict[str, Any]:
# 获取排位统计数据
league = userData['data']['user']['league']
leagueStats: dict[str, Any] = {}
if league['gamesplayed'] == 0:
leagueStats['Played'] = False
else:
leagueStats['Played'] = True
leagueStats['PPS'] = league['pps']
leagueStats['APM'] = league['apm']
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] != -1:
leagueStats['Ranked'] = True
leagueStats['Rating'] = round(league['rating'], 2)
leagueStats['Glicko'] = round(league['glicko'], 2)
leagueStats['RD'] = round(league['rd'], 2)
else:
leagueStats['Ranked'] = False
leagueStats['Standing'] = league['standing']
leagueStats['LPM'] = round((league['pps'] * 24), 2)
leagueStats['APL'] = round(
(leagueStats['APM'] / leagueStats['LPM']), 2)
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
leagueStats['ADPL'] = round(
(leagueStats['ADPM'] / leagueStats['LPM']), 2)
return leagueStats
async def getSprintStats(soloData: dict) -> Mapping[str, bool | int | float]:
# 获取40L统计数据
sprintStats = {}
if soloData['data']['records']['40l']['record'] is None:
sprintStats['Played'] = False
else:
sprintStats['Played'] = True
if soloData['data']['records']['40l']['rank'] is None:
sprintStats['Rank'] = False
else:
sprintStats['Rank'] = soloData['data']['records']['40l']['rank']
sprintStats['Time'] = round(
soloData['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
return sprintStats
async def getBlitzStats(soloData: dict) -> dict[str, Any]:
# 获取Blitz统计数据
blitzStats = {}
if soloData['data']['records']['blitz']['record'] is None:
blitzStats['Played'] = False
else:
blitzStats['Played'] = True
if soloData['data']['records']['blitz']['rank'] is None:
blitzStats['Rank'] = False
else:
blitzStats['Rank'] = soloData['data']['records']['blitz']['rank']
blitzStats['Score'] = soloData['data']['records']['blitz']['record']['endcontext']['score']
return blitzStats
async def generateMessage(userName: str = None, userID: str = None) -> str:
# 生成消息
userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID))
if userData[0] is False:
return '用户信息请求失败'
elif userData[1] is False:
return f'用户信息请求错误:\n{userData[2]["error"]}'
userName = userData[2]['data']['user']['username'].upper()
leagueStats = await getLeagueStats(userData[2])
message = ''
if leagueStats['Played'] is False:
message += f'用户 {userName} 没有排位统计数据'
else:
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
message += f'用户 {userName} 暂未完成定级赛'
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
else:
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
if leagueStats["VS"] != 0:
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
if soloData[0] is False:
return f'{message}\nSolo统计数据请求失败'
elif soloData[1] is False:
return f'{message}\nSolo统计数据请求错误:\n{soloData[2]["error"]}'
sprintStats, blitzStats = await gather(getSprintStats(soloData[2]), getBlitzStats(soloData[2]))
if sprintStats['Played'] is True:
message += f'\n40L: {sprintStats["Time"]}s'
if sprintStats['Rank'] is not False:
message += f' ( #{sprintStats["Rank"]} )'
if blitzStats['Played'] is True:
message += f'\nBlitz: {blitzStats["Score"]}'
if blitzStats['Rank'] is not False:
message += f' ( #{blitzStats["Rank"]} )'
return message

View File

@@ -0,0 +1,164 @@
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from typing import Any
from asyncio import gather
from re import I
from ..Utils.Request import request
from ..Utils.MessageAnalyzer import handleStatsQueryMessage
tosStats = on_regex(pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
@tosStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOS')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
elif decodedMessage[0] == 'AT' or decodedMessage[0] == 'QQ':
if decodedMessage[1][1] == event.self_id:
await matcher.finish(message='不能查询bot的信息')
message = await generateMessage(teaID=decodedMessage[1][1])
elif decodedMessage[0] == 'ME':
message = await generateMessage(teaID=event.sender.user_id)
elif decodedMessage[0] == 'Name':
message = await generateMessage(userName=decodedMessage[1][1])
await matcher.finish(message=message)
async def getUserInfo(userName: str = None, teaID: int = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取用户信息
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
elif userName is None and teaID is not None:
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
else:
raise ValueError(
'[TETRIS STATS] TOSDataProcessing.getUserInfo: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> tuple[bool, bool, dict[str, Any]]:
# 获取用户数据
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
elif userName is None and teaID is not None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
else:
raise ValueError(
'[TETRIS STATS] TOSDataProcessing.getUserData: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getRankStats(userInfo: dict) -> dict[str, bool | float]:
# 获取Rank数据
rankStats: dict[str, bool | float] = {}
if int(userInfo['data']['rankedGames']) == 0:
rankStats['Played'] = False
else:
rankStats['Played'] = True
rankStats['Rating'] = round(
float(userInfo['data']['ratingNow']), 2)
rankStats['RD'] = round(float(userInfo['data']['rdNow']), 2)
rankStats['Vol'] = round(float(userInfo['data']['volNow']), 3)
return rankStats
async def getGameData(userData: dict) -> dict[str, bool | int | float]:
# 获取游戏数据
gameData: dict[str, bool | int | float] = {}
if userData['data'] == []:
gameData['Played'] = False
else:
gameData['Played'] = True
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
for i in userData['data']:
# 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0:
continue
# 茶不计算没挖掘的局即使apm和lpm也如此
if i['dig'] is None:
continue
# 加权计算
time = i['time'] / 1000
lpm = 24 * (i['pieces'] / time)
apm = (i['attack'] / time) * 60
adpm = ((i['attack'] + i['dig']) / time) * 60
weightedTotalLpm += lpm * time
weightedTotalApm += apm * time
weightedTotalAdpm += adpm * time
weightedTotalTime += time
num += 1
if num == 50:
break
if num > 0:
gameData['NUM'] = num
gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2)
gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2)
gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2)
gameData['PPS'] = round((gameData['LPM'] / 24), 2)
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
else:
gameData['Played'] = False
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息
return gameData
async def getPBData(userInfo: dict) -> dict[str, bool | float | str]:
# 获取PB数据
PBData: dict[str, bool | float | str] = {}
if int(userInfo['data']['PBSprint']) == 2147483647:
PBData['Sprint'] = False
else:
PBData['Sprint'] = round(
float(userInfo['data']['PBSprint']) / 1000, 2)
if int(userInfo['data']['PBMarathon']) == 0:
PBData['Marathon'] = False
else:
PBData['Marathon'] = userInfo['data']['PBMarathon']
if int(userInfo['data']['PBChallenge']) == 0:
PBData['Challenge'] = False
else:
PBData['Challenge'] = userInfo['data']['PBChallenge']
return PBData
async def generateMessage(userName: str = None, teaID: int = None) -> str:
# 生成消息
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
if userInfo[0] is False:
return f'用户信息请求失败'
elif userInfo[1] is False:
return f'用户信息请求错误:\n{userInfo[2]["error"]}'
rankStats, PBData = await gather(getRankStats(userInfo[2]), getPBData(userInfo[2]))
message = ''
if rankStats['Played'] is False:
message += f'用户 {userInfo[2]["data"]["name"]}{userInfo[2]["data"]["teaId"]})暂无段位统计数据'
elif rankStats['Played'] is True:
message += f'用户 {userInfo[2]["data"]["name"]} ({userInfo[2]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
if userData[0] is False:
message = f'{message.rstrip()}\n游戏数据请求失败'
elif userData[1] is False:
message = f'{message.rstrip()}\n游戏数据请求错误:\n{userData[2]["error"]}'
else:
gameData = await getGameData(userData[2])
if gameData['Played'] is False:
message += ', 暂无游戏数据'
elif gameData['Played'] is True:
message += f', 最近 {gameData["NUM"]} 局数据'
message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )'
message += f'\nAPM{gameData["APM"]} ( x{gameData["APL"]} )'
message += f'\nADPM{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )'
if PBData['Sprint'] is not False:
message += f'\n40L: {PBData["Sprint"]}s'
if PBData['Marathon'] is not False:
message += f'\nMarathon: {PBData["Marathon"]}'
if PBData['Challenge'] is not False:
message += f'\nChallenge: {PBData["Challenge"]}'
return message

View File

@@ -0,0 +1 @@
from . import IODataProcessor, TOSDataProcessor

View File

@@ -1,154 +0,0 @@
from nonebot.log import logger
from asyncio import gather
import aiohttp
async def request(Url: str) -> dict[str, bool | dict[str, any]]:
# 封装请求函数
data = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data['Status'] = True
data['Data'] = await resp.json()
data['Success'] = data['Data']['success']
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] IODataProcessing.request: 请求错误\n{e}')
data['Status'] = False
finally:
return data
async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
# 获取用户数据
if userName is not None and userID is None:
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
userData = await request(Url=userDataUrl)
elif userName is None and userID is not None:
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
userData = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误')
return userData
async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
# 获取Solo数据
if userName is not None and userID is None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
soloData = await request(Url=userSoloUrl)
elif userName is None and userID is not None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
soloData = await request(Url=userSoloUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误')
return soloData
async def getUserID(userData: dict = None, userName: str = None) -> str:
# 获取用户ID
if userName is not None and userData is None:
userData = await getUserData(userName=userName)
elif userData is None and userName is None:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserID: 参数错误')
return userData['Data']['data']['user']['_id']
async def getLeagueStats(userData: dict) -> dict[str, bool | int | str | float]:
# 获取排位统计数据
league = userData['Data']['data']['user']['league']
leagueStats = {}
if league['gamesplayed'] == 0:
leagueStats['Played'] = False
else:
leagueStats['Played'] = True
leagueStats['PPS'] = league['pps']
leagueStats['APM'] = league['apm']
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] != -1:
leagueStats['Ranked'] = True
leagueStats['Rating'] = round(league['rating'], 2)
leagueStats['Glicko'] = round(league['glicko'], 2)
leagueStats['RD'] = round(league['rd'], 2)
else:
leagueStats['Ranked'] = False
leagueStats['Standing'] = league['standing']
leagueStats['LPM'] = round((league['pps'] * 24), 2)
leagueStats['APL'] = round(
(leagueStats['APM'] / leagueStats['LPM']), 2)
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
leagueStats['ADPL'] = round(
(leagueStats['ADPM'] / leagueStats['LPM']), 2)
return leagueStats
async def getSprintStats(soloData: dict) -> dict[str, bool | int | float]:
# 获取40L统计数据
sprintStats = {}
if soloData['Data']['data']['records']['40l']['record'] is None:
sprintStats['Played'] = False
else:
sprintStats['Played'] = True
if soloData['Data']['data']['records']['40l']['rank'] is None:
sprintStats['Rank'] = False
else:
sprintStats['Rank'] = soloData['Data']['data']['records']['40l']['rank']
sprintStats['Time'] = round(
soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
return sprintStats
async def getBlitzStats(soloData: dict) -> dict[str, bool | int]:
# 获取Blitz统计数据
blitzStats = {}
if soloData['Data']['data']['records']['blitz']['record'] is None:
blitzStats['Played'] = False
else:
blitzStats['Played'] = True
if soloData['Data']['data']['records']['blitz']['rank'] is None:
blitzStats['Rank'] = False
else:
blitzStats['Rank'] = soloData['Data']['data']['records']['blitz']['rank']
blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score']
return blitzStats
async def generateMessage(userName: str = None, userID: str = None) -> str:
# 生成消息
userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID))
if userData['Status'] is False:
return '用户信息请求失败'
if userData['Success'] is False:
return f'用户信息请求错误:\n{userData["Data"]["error"]}'
userName = userData['Data']['data']['user']['username'].upper()
leagueStats = await getLeagueStats(userData)
message = ''
if leagueStats['Played'] is False:
message += f'用户 {userName} 没有排位统计数据'
else:
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
message += f'用户 {userName} 暂未完成定级赛'
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
else:
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
if leagueStats["VS"] != 0:
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
if soloData['Status'] is False:
return f'{message}\nSolo统计数据请求失败'
sprintStats = await getSprintStats(soloData)
blitzStats = await getBlitzStats(soloData)
if sprintStats['Played'] is True:
message += f'\n40L: {sprintStats["Time"]}s'
if sprintStats['Rank'] is not False:
message += f' ( #{sprintStats["Rank"]} )'
if blitzStats['Played'] is True:
message += f'\nBlitz: {blitzStats["Score"]}'
if blitzStats['Rank'] is not False:
message += f' ( #{blitzStats["Rank"]} )'
return message

View File

@@ -1,70 +0,0 @@
from re import match
async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
return await checkName(message, gameType)
async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_ME = ['', '自己', '', '', 'me']
message = (message.strip()).lower()
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
if message.startswith('[cq:at,qq='):
try:
QQNumber = int((str(message)).split(
'[cq:at,qq=')[1].split(']')[0])
except ValueError:
return {'Success': False, 'Type': None, 'Message': 'QQ号码不合法'}
else:
return {'Success': True, 'Type': 'AT', 'Message': None, 'QQNumber': QQNumber}
elif message in _ME:
return {'Success': True, 'Type': 'ME', 'Message': None}
else:
return await checkName(message, gameType)
async def checkName(name: str, gameType: str) -> dict[str, bool | str]:
if gameType == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return {'Success': True, 'Type': 'ID', 'Message': None, 'User': name}
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
elif name.isdigit() is True:
return {'Success': True, 'Type': 'QQ', 'Message': None, 'QQNumber': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}

View File

@@ -1,152 +0,0 @@
from nonebot.log import logger
from asyncio import gather
import aiohttp
async def request(Url: str) -> dict[str, bool | dict[str, any]]:
# 封装请求函数
data = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data['Status'] = True
data['Data'] = await resp.json()
data['Success'] = data['Data']['success']
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] TOSDataProcessing.request: 请求错误\n{e}')
data['Status'] = False
finally:
return data
async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool | dict[str, any]]:
# 获取用户信息
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
userInfo = await request(Url=userDataUrl)
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
userInfo = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误')
return userInfo
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool | dict[str, any]]:
# 获取用户数据
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
userData = await request(Url=userDataUrl)
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
userData = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误')
return userData
async def getRankStats(userInfo: dict) -> dict[str, bool | float]:
# 获取Rank数据
rankStats = {}
if int(userInfo['Data']['data']['rankedGames']) == 0:
rankStats['Played'] = False
else:
rankStats['Played'] = True
rankStats['Rating'] = round(
float(userInfo['Data']['data']['ratingNow']), 2)
rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2)
rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3)
return rankStats
async def getGameData(userData: dict) -> dict[str, bool | int | float]:
# 获取游戏数据
gameData = {}
if userData['Data']['data'] == []:
gameData['Played'] = False
else:
gameData['Played'] = True
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
for i in userData['Data']['data']:
# 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0:
continue
# 茶不计算没挖掘的局即使apm和lpm也如此
if i['dig'] is None:
break
# 加权计算
time = i['time'] / 1000
lpm = 24 * (i['pieces'] / time)
apm = (i['attack'] / time) * 60
adpm = ((i['attack'] + i['dig']) / time) * 60
weightedTotalLpm += lpm * time
weightedTotalApm += apm * time
weightedTotalAdpm += adpm * time
weightedTotalTime += time
num += 1
if num == 50:
break
gameData['NUM'] = num
gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2)
gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2)
gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2)
gameData['PPS'] = round((gameData['LPM'] / 24), 2)
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息
return gameData
async def getPBData(userInfo: dict) -> dict[str, bool | float | str]:
# 获取PB数据
PBData = {}
if int(userInfo['Data']['data']['PBSprint']) == 2147483647:
PBData['Sprint'] = False
else:
PBData['Sprint'] = round(
float(userInfo['Data']['data']['PBSprint']) / 1000, 2)
if int(userInfo['Data']['data']['PBMarathon']) == 0:
PBData['Marathon'] = False
else:
PBData['Marathon'] = userInfo['Data']['data']['PBMarathon']
if int(userInfo['Data']['data']['PBChallenge']) == 0:
PBData['Challenge'] = False
else:
PBData['Challenge'] = userInfo['Data']['data']['PBChallenge']
return PBData
async def generateMessage(userName: str = None, teaID: int = None) -> str:
# 生成消息
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
if userInfo['Status'] is False:
return f'用户信息请求失败'
if userInfo['Success'] is False:
return f'用户信息请求错误:\n{userInfo["Data"]["error"]}'
rankStats = await getRankStats(userInfo)
PBData = await getPBData(userInfo)
message = ''
if rankStats['Played'] is False:
message += f'用户 {userInfo["Data"]["data"]["name"]}{userInfo["Data"]["data"]["teaId"]})暂无段位统计数据'
elif rankStats['Played'] is True:
message += f'用户 {userInfo["Data"]["data"]["name"]} ({userInfo["Data"]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
if userData['Status'] is False:
message = f'{message.rstrip()}\n游戏数据请求失败'
elif userData['Status'] is True:
gameData = await getGameData(userData)
if gameData['Played'] is False:
message += ', 暂无游戏数据'
elif gameData['Played'] is True:
message += f', 最近 {gameData["NUM"]} 局数据'
message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )'
message += f'\nAPM{gameData["APM"]} ( x{gameData["APL"]} )'
message += f'\nADPM{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )'
if PBData['Sprint'] is not False:
message += f'\n40L: {PBData["Sprint"]}s'
if PBData['Marathon'] is not False:
message += f'\nMarathon: {PBData["Marathon"]}'
if PBData['Challenge'] is not False:
message += f'\nChallenge: {PBData["Challenge"]}'
return message

View File

@@ -0,0 +1,79 @@
from re import match, sub
async def handleBindMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
else:
raise ValueError(
'[TETRIS STATS] MessageAnalyzer.handleBindMessage: 预期外行为请上报GitHub')
if message == '' or message.isspace():
return (None, ('用户名为空', None))
else:
return await checkName(message, gameType)
async def handleStatsQueryMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_ME = ['', '自己', '我等', '卑人', '', '老身', '', '老娘', '本姑娘', '本大爷',
'鄙人', '寡人', '小生', '贫僧', '本人', '', '', '', '', '', 'me']
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
if message == '' or message.isspace():
return (None, ('用户名为空', None))
else:
if message.startswith('[CQ:at,qq='):
try:
user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0])
except ValueError:
return (None, ('QQ号码不合法', None))
else:
return ('AT', (None, user))
elif message in _ME:
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
return ('ME', (None, None))
else:
return await checkName(message, gameType)
async def checkName(name: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
if gameType == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return ('ID', (None, name))
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return ('Name', (None, name.lower()))
else:
return (None, ('用户名不合法', None))
elif gameType == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return ('Name', (None, name))
else:
return (None, ('用户名不合法', None))
elif gameType == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return ('Name', (None, name))
elif name.isdigit() is True:
return ('QQ', (None, name))
else:
return (None, ('用户名不合法', None))
else:
return (None, ('游戏类型错误', None))

View File

@@ -0,0 +1,17 @@
from nonebot.log import logger
from typing import Any
import aiohttp
async def request(Url: str) -> tuple[bool, bool, dict[str, Any]]:
# 封装请求函数
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data = await resp.json()
return (True, data['success'], data)
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] request.request: 请求错误\n{e}')
return (False, False, {})

View File

@@ -3,13 +3,17 @@ from nonebot.log import logger
import sqlite3
import os
_DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db'
_DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db'
async def initDB():
# 初始化数据库
if not os.path.exists(os.path.dirname(_DB_FILE)):
os.makedirs(os.path.dirname(_DB_FILE))
if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径
os.rename('data/nonebot-plugin-tetris-stats',
os.path.dirname(_DB_FILE))
else:
os.makedirs(os.path.dirname(_DB_FILE))
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
@@ -23,7 +27,7 @@ async def initDB():
logger.info('数据库初始化完成')
async def queryBindInfo(QQNumber: int, gameType: str) -> dict:
async def queryBindInfo(QQNumber: str | int, gameType: str) -> str | None:
# 查询绑定信息
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
@@ -32,21 +36,21 @@ async def queryBindInfo(QQNumber: int, gameType: str) -> dict:
db.commit()
db.close()
if user is None:
return {'Hit': False, 'User': None}
return None
else:
return {'Hit': True, 'User': user[0]}
return user[0]
async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str:
async def writeBindInfo(QQNumber: str | int, user: str, gameType: str) -> str:
# 写入绑定信息
info = await queryBindInfo(QQNumber, gameType)
bindInfo = await queryBindInfo(QQNumber, gameType)
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
if info['Hit'] is True:
if bindInfo is not None:
cursor.execute(
f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber))
message = '更新成功'
elif info['Hit'] is False:
elif bindInfo is None:
cursor.execute(
f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user))
message = '绑定成功'

View File

@@ -0,0 +1 @@
from . import MessageAnalyzer, Request, SQL

View File

@@ -1,15 +1,9 @@
from re import I
from nonebot import get_driver
from nonebot import get_driver, on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from .Utils.SQL import initDB
from .MessageAnalyzer import *
from .SQL import *
from . import GameDataProcessor
from .IODataProcessing import generateMessage as IOgenerateMessage
from .IODataProcessing import getUserID as IOgetUserID
from .TOSDataProcessing import generateMessage as TOSgenerateMessage
driver = get_driver()
@@ -17,76 +11,3 @@ driver = get_driver()
@driver.on_startup
async def startUP():
await initDB()
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
@ioBind.handle()
async def bindIOUser(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleBindMessage(message=str(event.get_message()), gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'ID':
user = decodedMessage['User']
elif decodedMessage['Type'] == 'Name':
user = await IOgetUserID(userName=decodedMessage['User'])
message = await writeBindInfo(QQNumber=event.sender.user_id, user=user, gameType='IO')
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.send(message=message)
@ioStats.handle()
async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT':
bindInfo = await queryBindInfo(QQNumber=decodedMessage['QQNumber'], gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '未查询到绑定信息'
elif decodedMessage['Type'] == 'ME':
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '您还没有绑定账号'
elif decodedMessage['Type'] == 'ID':
message = await IOgenerateMessage(userID=decodedMessage['User'])
elif decodedMessage['Type'] == 'Name':
message = await IOgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@tosStats.handle()
async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='TOS')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT' or decodedMessage['Type'] == 'QQ':
message = await TOSgenerateMessage(teaID=decodedMessage['QQNumber'])
elif decodedMessage['Type'] == 'ME':
message = await TOSgenerateMessage(teaID=event.sender.user_id)
elif decodedMessage['Type'] == 'Name':
message = await TOSgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@topBind.handle()
async def bindTOPUser(event: MessageEvent, matcher: Matcher):
await matcher.send(message='TODO')
@topStats.handle()
async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher):
await matcher.send(message='TODO')

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "nonebot-plugin-tetris-stats"
version = "0.1.1"
version = "0.2.0"
description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件"
authors = ["scdhh <wallfjjd@gmail.com>"]
readme = "README.md"