mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8ed1c20feb | |||
| e5abc99590 | |||
| d6449ddf8c | |||
| dcbab47833 | |||
|
|
e65c50e107 | ||
| 4525b84fc4 | |||
| 6207cdb3d9 | |||
| f1291a9923 | |||
|
|
0d2b37e78a | ||
|
|
a8998b01a7 | ||
| df6cb71e3f | |||
| 2a19b9fe4c |
72
.github/workflows/codeql-analysis.yml
vendored
Normal file
72
.github/workflows/codeql-analysis.yml
vendored
Normal file
@@ -0,0 +1,72 @@
|
||||
# For most projects, this workflow file will not need changing; you simply need
|
||||
# to commit it to your repository.
|
||||
#
|
||||
# You may wish to alter this file to override the set of languages analyzed,
|
||||
# or to provide custom queries or build logic.
|
||||
#
|
||||
# ******** NOTE ********
|
||||
# We have attempted to detect the languages in your repository. Please check
|
||||
# the `language` matrix defined below to confirm you have the correct set of
|
||||
# supported CodeQL languages.
|
||||
#
|
||||
name: "CodeQL"
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
pull_request:
|
||||
# The branches below must be a subset of the branches above
|
||||
branches: [ main ]
|
||||
schedule:
|
||||
- cron: '17 6 * * 5'
|
||||
|
||||
jobs:
|
||||
analyze:
|
||||
name: Analyze
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
actions: read
|
||||
contents: read
|
||||
security-events: write
|
||||
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
language: [ 'python' ]
|
||||
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
|
||||
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v3
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
uses: github/codeql-action/init@v2
|
||||
with:
|
||||
languages: ${{ matrix.language }}
|
||||
# If you wish to specify custom queries, you can do so here or in a config file.
|
||||
# By default, queries listed here will override any specified in a config file.
|
||||
# Prefix the list here with "+" to use these queries and those in the config file.
|
||||
|
||||
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
|
||||
# queries: security-extended,security-and-quality
|
||||
|
||||
|
||||
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
|
||||
# If this step fails, then you should remove it and run the build manually (see below)
|
||||
- name: Autobuild
|
||||
uses: github/codeql-action/autobuild@v2
|
||||
|
||||
# ℹ️ Command-line programs to run using the OS shell.
|
||||
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
|
||||
|
||||
# If the Autobuild fails above, remove it and uncomment the following three lines.
|
||||
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
|
||||
|
||||
# - run: |
|
||||
# echo "Run, Build Application using script"
|
||||
# ./location_of_script_within_repo/buildscript.sh
|
||||
|
||||
- name: Perform CodeQL Analysis
|
||||
uses: github/codeql-action/analyze@v2
|
||||
3
.gitignore
vendored
3
.gitignore
vendored
@@ -1 +1,4 @@
|
||||
dist
|
||||
test*
|
||||
Untitled*
|
||||
*copy*
|
||||
|
||||
21
LICENSE
Normal file
21
LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2022 scdhh
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
@@ -12,10 +12,17 @@ TETRIS Stats
|
||||
安装
|
||||
----
|
||||
|
||||
* 使用 pip
|
||||
* 使用 nb-cli(推荐)
|
||||
|
||||
```
|
||||
nb plugin install nonebot-plugin-tetris-stats
|
||||
```
|
||||
|
||||
* 使用 pip(不推荐)
|
||||
|
||||
```
|
||||
pip install nonebot-plugin-tetris-stats
|
||||
# 修改bot.py
|
||||
```
|
||||
|
||||
使用
|
||||
|
||||
211
nonebot_plugin_tetris_stats/GameDataProcessor/IODataProcessor.py
Normal file
211
nonebot_plugin_tetris_stats/GameDataProcessor/IODataProcessor.py
Normal file
@@ -0,0 +1,211 @@
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.log import logger
|
||||
|
||||
from typing import Any, Mapping
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
|
||||
from ..Utils.Request import request
|
||||
|
||||
from ..Utils.MessageAnalyzer import handleBindMessage, handleStatsQueryMessage
|
||||
from ..Utils.SQL import queryBindInfo, writeBindInfo
|
||||
|
||||
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
|
||||
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
|
||||
|
||||
|
||||
@ioBind.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decodedMessage = await handleBindMessage(message=event.raw_message, gameType='IO')
|
||||
if decodedMessage[0] is None:
|
||||
await matcher.finish(decodedMessage[1][0])
|
||||
if decodedMessage[0] == 'ID':
|
||||
userIDStats = await checkUserID(userID=decodedMessage[1][1])
|
||||
if userIDStats[0] is False:
|
||||
await matcher.finish(userIDStats[1])
|
||||
else:
|
||||
userID = decodedMessage[1][1]
|
||||
elif decodedMessage[0] == 'Name':
|
||||
userData = await getUserData(userName=decodedMessage[1][1])
|
||||
if userData[0] is False:
|
||||
await matcher.finish('用户信息请求失败')
|
||||
elif userData[1] is False:
|
||||
await matcher.finish(f'用户信息请求错误:\n{userData[2]["error"]}')
|
||||
else:
|
||||
userID = await getUserID(userData=userData[2])
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的,ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(await writeBindInfo(QQNumber=event.sender.user_id, user=userID, gameType='IO'))
|
||||
|
||||
|
||||
@ioStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='IO')
|
||||
if decodedMessage[0] is None:
|
||||
await matcher.finish(decodedMessage[1][0])
|
||||
elif decodedMessage[0] == 'AT':
|
||||
if event.is_tome() is True:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
bindInfo = await queryBindInfo(QQNumber=decodedMessage[1][1], gameType='IO')
|
||||
if bindInfo is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
|
||||
elif decodedMessage[0] == 'ME':
|
||||
if event.sender.user_id is None:
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败,请联系bot主人')
|
||||
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
|
||||
if bindInfo is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
|
||||
elif decodedMessage[0] == 'ID':
|
||||
message = await generateMessage(userID=decodedMessage[1][1])
|
||||
elif decodedMessage[0] == 'Name':
|
||||
message = await generateMessage(userName=decodedMessage[1][1])
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
async def getUserData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
|
||||
# 获取用户数据
|
||||
if userName is not None and userID is None:
|
||||
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
|
||||
elif userName is None and userID is not None:
|
||||
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
|
||||
else:
|
||||
raise ValueError(
|
||||
'[TETRIS STATS] IODataProcessing.getUserData: 预期外行为,请上报GitHub')
|
||||
return await request(Url=userDataUrl)
|
||||
|
||||
|
||||
async def getSoloData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
|
||||
# 获取Solo数据
|
||||
if userName is not None and userID is None:
|
||||
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
|
||||
elif userName is None and userID is not None:
|
||||
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
|
||||
else:
|
||||
raise ValueError(
|
||||
'[TETRIS STATS] IODataProcessing.getSoloData: 预期外行为,请上报GitHub')
|
||||
return await request(Url=userSoloUrl)
|
||||
|
||||
|
||||
async def getUserID(userData: dict) -> str:
|
||||
return userData['data']['user']['_id']
|
||||
|
||||
|
||||
async def checkUserID(userID: str) -> tuple[bool, str]:
|
||||
userData = await getUserData(userID=userID)
|
||||
if userData[0] is False:
|
||||
return (False, '用户信息请求失败')
|
||||
elif userData[1] is False:
|
||||
return (False, f'用户信息请求错误:\n{userData[2]["error"]}')
|
||||
elif userID == userData[2]['data']['user']['_id']:
|
||||
return (True, '')
|
||||
else:
|
||||
raise ValueError(
|
||||
'[TETRIS STATS] IODataProcessing.checkUserID: 服务器返回的userID和用户提供的不一致,这种情况理论上不应该发生,以防万一还是写一下(x')
|
||||
|
||||
|
||||
async def getLeagueStats(userData: dict) -> dict[str, Any]:
|
||||
# 获取排位统计数据
|
||||
league = userData['data']['user']['league']
|
||||
leagueStats: dict[str, Any] = {}
|
||||
if league['gamesplayed'] == 0:
|
||||
leagueStats['Played'] = False
|
||||
else:
|
||||
leagueStats['Played'] = True
|
||||
leagueStats['PPS'] = league['pps']
|
||||
leagueStats['APM'] = league['apm']
|
||||
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
|
||||
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
|
||||
if league['rating'] != -1:
|
||||
leagueStats['Ranked'] = True
|
||||
leagueStats['Rating'] = round(league['rating'], 2)
|
||||
leagueStats['Glicko'] = round(league['glicko'], 2)
|
||||
leagueStats['RD'] = round(league['rd'], 2)
|
||||
else:
|
||||
leagueStats['Ranked'] = False
|
||||
leagueStats['Standing'] = league['standing']
|
||||
leagueStats['LPM'] = round((league['pps'] * 24), 2)
|
||||
leagueStats['APL'] = round(
|
||||
(leagueStats['APM'] / leagueStats['LPM']), 2)
|
||||
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
|
||||
leagueStats['ADPL'] = round(
|
||||
(leagueStats['ADPM'] / leagueStats['LPM']), 2)
|
||||
return leagueStats
|
||||
|
||||
|
||||
async def getSprintStats(soloData: dict) -> Mapping[str, bool | int | float]:
|
||||
# 获取40L统计数据
|
||||
sprintStats = {}
|
||||
if soloData['data']['records']['40l']['record'] is None:
|
||||
sprintStats['Played'] = False
|
||||
else:
|
||||
sprintStats['Played'] = True
|
||||
if soloData['data']['records']['40l']['rank'] is None:
|
||||
sprintStats['Rank'] = False
|
||||
else:
|
||||
sprintStats['Rank'] = soloData['data']['records']['40l']['rank']
|
||||
sprintStats['Time'] = round(
|
||||
soloData['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
|
||||
return sprintStats
|
||||
|
||||
|
||||
async def getBlitzStats(soloData: dict) -> dict[str, Any]:
|
||||
# 获取Blitz统计数据
|
||||
blitzStats = {}
|
||||
if soloData['data']['records']['blitz']['record'] is None:
|
||||
blitzStats['Played'] = False
|
||||
else:
|
||||
blitzStats['Played'] = True
|
||||
if soloData['data']['records']['blitz']['rank'] is None:
|
||||
blitzStats['Rank'] = False
|
||||
else:
|
||||
blitzStats['Rank'] = soloData['data']['records']['blitz']['rank']
|
||||
blitzStats['Score'] = soloData['data']['records']['blitz']['record']['endcontext']['score']
|
||||
return blitzStats
|
||||
|
||||
|
||||
async def generateMessage(userName: str = None, userID: str = None) -> str:
|
||||
# 生成消息
|
||||
userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID))
|
||||
if userData[0] is False:
|
||||
return '用户信息请求失败'
|
||||
elif userData[1] is False:
|
||||
return f'用户信息请求错误:\n{userData[2]["error"]}'
|
||||
userName = userData[2]['data']['user']['username'].upper()
|
||||
leagueStats = await getLeagueStats(userData[2])
|
||||
message = ''
|
||||
if leagueStats['Played'] is False:
|
||||
message += f'用户 {userName} 没有排位统计数据'
|
||||
else:
|
||||
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
|
||||
message += f'用户 {userName} 暂未完成定级赛'
|
||||
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
|
||||
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
|
||||
else:
|
||||
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
|
||||
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
|
||||
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
|
||||
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
|
||||
if leagueStats["VS"] != 0:
|
||||
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
|
||||
if soloData[0] is False:
|
||||
return f'{message}\nSolo统计数据请求失败'
|
||||
elif soloData[1] is False:
|
||||
return f'{message}\nSolo统计数据请求错误:\n{soloData[2]["error"]}'
|
||||
sprintStats, blitzStats = await gather(getSprintStats(soloData[2]), getBlitzStats(soloData[2]))
|
||||
if sprintStats['Played'] is True:
|
||||
message += f'\n40L: {sprintStats["Time"]}s'
|
||||
if sprintStats['Rank'] is not False:
|
||||
message += f' ( #{sprintStats["Rank"]} )'
|
||||
if blitzStats['Played'] is True:
|
||||
message += f'\nBlitz: {blitzStats["Score"]}'
|
||||
if blitzStats['Rank'] is not False:
|
||||
message += f' ( #{blitzStats["Rank"]} )'
|
||||
return message
|
||||
@@ -0,0 +1,164 @@
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
|
||||
from typing import Any
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
|
||||
from ..Utils.Request import request
|
||||
|
||||
from ..Utils.MessageAnalyzer import handleStatsQueryMessage
|
||||
|
||||
tosStats = on_regex(pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
|
||||
flags=I, permission=GROUP)
|
||||
|
||||
|
||||
@tosStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOS')
|
||||
if decodedMessage[0] is None:
|
||||
await matcher.finish(decodedMessage[1][0])
|
||||
elif decodedMessage[0] == 'AT' or decodedMessage[0] == 'QQ':
|
||||
if decodedMessage[1][1] == event.self_id:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
message = await generateMessage(teaID=decodedMessage[1][1])
|
||||
elif decodedMessage[0] == 'ME':
|
||||
message = await generateMessage(teaID=event.sender.user_id)
|
||||
elif decodedMessage[0] == 'Name':
|
||||
message = await generateMessage(userName=decodedMessage[1][1])
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
async def getUserInfo(userName: str = None, teaID: int = None) -> tuple[bool, bool, dict[str, Any]]:
|
||||
# 获取用户信息
|
||||
if userName is not None and teaID is None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
|
||||
elif userName is None and teaID is not None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
|
||||
else:
|
||||
raise ValueError(
|
||||
'[TETRIS STATS] TOSDataProcessing.getUserInfo: 预期外行为,请上报GitHub')
|
||||
return await request(Url=userDataUrl)
|
||||
|
||||
|
||||
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> tuple[bool, bool, dict[str, Any]]:
|
||||
# 获取用户数据
|
||||
if userName is not None and teaID is None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
|
||||
elif userName is None and teaID is not None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
|
||||
else:
|
||||
raise ValueError(
|
||||
'[TETRIS STATS] TOSDataProcessing.getUserData: 预期外行为,请上报GitHub')
|
||||
return await request(Url=userDataUrl)
|
||||
|
||||
|
||||
async def getRankStats(userInfo: dict) -> dict[str, bool | float]:
|
||||
# 获取Rank数据
|
||||
rankStats: dict[str, bool | float] = {}
|
||||
if int(userInfo['data']['rankedGames']) == 0:
|
||||
rankStats['Played'] = False
|
||||
else:
|
||||
rankStats['Played'] = True
|
||||
rankStats['Rating'] = round(
|
||||
float(userInfo['data']['ratingNow']), 2)
|
||||
rankStats['RD'] = round(float(userInfo['data']['rdNow']), 2)
|
||||
rankStats['Vol'] = round(float(userInfo['data']['volNow']), 3)
|
||||
return rankStats
|
||||
|
||||
|
||||
async def getGameData(userData: dict) -> dict[str, bool | int | float]:
|
||||
# 获取游戏数据
|
||||
gameData: dict[str, bool | int | float] = {}
|
||||
if userData['data'] == []:
|
||||
gameData['Played'] = False
|
||||
else:
|
||||
gameData['Played'] = True
|
||||
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
|
||||
for i in userData['data']:
|
||||
# 排除单人局和时间为0的游戏
|
||||
if i['num_players'] == 1 or i['time'] == 0:
|
||||
continue
|
||||
# 茶:不计算没挖掘的局,即使apm和lpm也如此
|
||||
if i['dig'] is None:
|
||||
continue
|
||||
# 加权计算
|
||||
time = i['time'] / 1000
|
||||
lpm = 24 * (i['pieces'] / time)
|
||||
apm = (i['attack'] / time) * 60
|
||||
adpm = ((i['attack'] + i['dig']) / time) * 60
|
||||
weightedTotalLpm += lpm * time
|
||||
weightedTotalApm += apm * time
|
||||
weightedTotalAdpm += adpm * time
|
||||
weightedTotalTime += time
|
||||
num += 1
|
||||
if num == 50:
|
||||
break
|
||||
if num > 0:
|
||||
gameData['NUM'] = num
|
||||
gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2)
|
||||
gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2)
|
||||
gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2)
|
||||
gameData['PPS'] = round((gameData['LPM'] / 24), 2)
|
||||
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
|
||||
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
|
||||
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
|
||||
else:
|
||||
gameData['Played'] = False
|
||||
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息
|
||||
return gameData
|
||||
|
||||
|
||||
async def getPBData(userInfo: dict) -> dict[str, bool | float | str]:
|
||||
# 获取PB数据
|
||||
PBData: dict[str, bool | float | str] = {}
|
||||
if int(userInfo['data']['PBSprint']) == 2147483647:
|
||||
PBData['Sprint'] = False
|
||||
else:
|
||||
PBData['Sprint'] = round(
|
||||
float(userInfo['data']['PBSprint']) / 1000, 2)
|
||||
if int(userInfo['data']['PBMarathon']) == 0:
|
||||
PBData['Marathon'] = False
|
||||
else:
|
||||
PBData['Marathon'] = userInfo['data']['PBMarathon']
|
||||
if int(userInfo['data']['PBChallenge']) == 0:
|
||||
PBData['Challenge'] = False
|
||||
else:
|
||||
PBData['Challenge'] = userInfo['data']['PBChallenge']
|
||||
return PBData
|
||||
|
||||
|
||||
async def generateMessage(userName: str = None, teaID: int = None) -> str:
|
||||
# 生成消息
|
||||
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
|
||||
if userInfo[0] is False:
|
||||
return f'用户信息请求失败'
|
||||
elif userInfo[1] is False:
|
||||
return f'用户信息请求错误:\n{userInfo[2]["error"]}'
|
||||
rankStats, PBData = await gather(getRankStats(userInfo[2]), getPBData(userInfo[2]))
|
||||
message = ''
|
||||
if rankStats['Played'] is False:
|
||||
message += f'用户 {userInfo[2]["data"]["name"]}({userInfo[2]["data"]["teaId"]})暂无段位统计数据'
|
||||
elif rankStats['Played'] is True:
|
||||
message += f'用户 {userInfo[2]["data"]["name"]} ({userInfo[2]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
|
||||
if userData[0] is False:
|
||||
message = f'{message.rstrip()}\n游戏数据请求失败'
|
||||
elif userData[1] is False:
|
||||
message = f'{message.rstrip()}\n游戏数据请求错误:\n{userData[2]["error"]}'
|
||||
else:
|
||||
gameData = await getGameData(userData[2])
|
||||
if gameData['Played'] is False:
|
||||
message += ', 暂无游戏数据'
|
||||
elif gameData['Played'] is True:
|
||||
message += f', 最近 {gameData["NUM"]} 局数据'
|
||||
message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )'
|
||||
message += f'\nAPM:{gameData["APM"]} ( x{gameData["APL"]} )'
|
||||
message += f'\nADPM:{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )'
|
||||
if PBData['Sprint'] is not False:
|
||||
message += f'\n40L: {PBData["Sprint"]}s'
|
||||
if PBData['Marathon'] is not False:
|
||||
message += f'\nMarathon: {PBData["Marathon"]}'
|
||||
if PBData['Challenge'] is not False:
|
||||
message += f'\nChallenge: {PBData["Challenge"]}'
|
||||
return message
|
||||
@@ -0,0 +1 @@
|
||||
from . import IODataProcessor, TOSDataProcessor
|
||||
@@ -1,154 +0,0 @@
|
||||
from nonebot.log import logger
|
||||
|
||||
from asyncio import gather
|
||||
import aiohttp
|
||||
|
||||
|
||||
async def request(Url: str) -> dict[str, bool | dict[str, any]]:
|
||||
# 封装请求函数
|
||||
data = {}
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(Url) as resp:
|
||||
data['Status'] = True
|
||||
data['Data'] = await resp.json()
|
||||
data['Success'] = data['Data']['success']
|
||||
except aiohttp.client_exceptions.ClientConnectorError as e:
|
||||
logger.error(f'[TETRIS STATS] IODataProcessing.request: 请求错误\n{e}')
|
||||
data['Status'] = False
|
||||
finally:
|
||||
return data
|
||||
|
||||
|
||||
async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
|
||||
# 获取用户数据
|
||||
if userName is not None and userID is None:
|
||||
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
|
||||
userData = await request(Url=userDataUrl)
|
||||
elif userName is None and userID is not None:
|
||||
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
|
||||
userData = await request(Url=userDataUrl)
|
||||
else:
|
||||
raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误')
|
||||
return userData
|
||||
|
||||
|
||||
async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
|
||||
# 获取Solo数据
|
||||
if userName is not None and userID is None:
|
||||
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
|
||||
soloData = await request(Url=userSoloUrl)
|
||||
elif userName is None and userID is not None:
|
||||
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
|
||||
soloData = await request(Url=userSoloUrl)
|
||||
else:
|
||||
raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误')
|
||||
return soloData
|
||||
|
||||
|
||||
async def getUserID(userData: dict = None, userName: str = None) -> str:
|
||||
# 获取用户ID
|
||||
if userName is not None and userData is None:
|
||||
userData = await getUserData(userName=userName)
|
||||
elif userData is None and userName is None:
|
||||
raise ValueError('[TETRIS STATS] IODataProcessing.getUserID: 参数错误')
|
||||
return userData['Data']['data']['user']['_id']
|
||||
|
||||
|
||||
async def getLeagueStats(userData: dict) -> dict[str, bool | int | str | float]:
|
||||
# 获取排位统计数据
|
||||
league = userData['Data']['data']['user']['league']
|
||||
leagueStats = {}
|
||||
if league['gamesplayed'] == 0:
|
||||
leagueStats['Played'] = False
|
||||
else:
|
||||
leagueStats['Played'] = True
|
||||
leagueStats['PPS'] = league['pps']
|
||||
leagueStats['APM'] = league['apm']
|
||||
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
|
||||
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
|
||||
if league['rating'] != -1:
|
||||
leagueStats['Ranked'] = True
|
||||
leagueStats['Rating'] = round(league['rating'], 2)
|
||||
leagueStats['Glicko'] = round(league['glicko'], 2)
|
||||
leagueStats['RD'] = round(league['rd'], 2)
|
||||
else:
|
||||
leagueStats['Ranked'] = False
|
||||
leagueStats['Standing'] = league['standing']
|
||||
leagueStats['LPM'] = round((league['pps'] * 24), 2)
|
||||
leagueStats['APL'] = round(
|
||||
(leagueStats['APM'] / leagueStats['LPM']), 2)
|
||||
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
|
||||
leagueStats['ADPL'] = round(
|
||||
(leagueStats['ADPM'] / leagueStats['LPM']), 2)
|
||||
return leagueStats
|
||||
|
||||
|
||||
async def getSprintStats(soloData: dict) -> dict[str, bool | int | float]:
|
||||
# 获取40L统计数据
|
||||
sprintStats = {}
|
||||
if soloData['Data']['data']['records']['40l']['record'] is None:
|
||||
sprintStats['Played'] = False
|
||||
else:
|
||||
sprintStats['Played'] = True
|
||||
if soloData['Data']['data']['records']['40l']['rank'] is None:
|
||||
sprintStats['Rank'] = False
|
||||
else:
|
||||
sprintStats['Rank'] = soloData['Data']['data']['records']['40l']['rank']
|
||||
sprintStats['Time'] = round(
|
||||
soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
|
||||
return sprintStats
|
||||
|
||||
|
||||
async def getBlitzStats(soloData: dict) -> dict[str, bool | int]:
|
||||
# 获取Blitz统计数据
|
||||
blitzStats = {}
|
||||
if soloData['Data']['data']['records']['blitz']['record'] is None:
|
||||
blitzStats['Played'] = False
|
||||
else:
|
||||
blitzStats['Played'] = True
|
||||
if soloData['Data']['data']['records']['blitz']['rank'] is None:
|
||||
blitzStats['Rank'] = False
|
||||
else:
|
||||
blitzStats['Rank'] = soloData['Data']['data']['records']['blitz']['rank']
|
||||
blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score']
|
||||
return blitzStats
|
||||
|
||||
|
||||
async def generateMessage(userName: str = None, userID: str = None) -> str:
|
||||
# 生成消息
|
||||
userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID))
|
||||
if userData['Status'] is False:
|
||||
return '用户信息请求失败'
|
||||
if userData['Success'] is False:
|
||||
return f'用户信息请求错误:\n{userData["Data"]["error"]}'
|
||||
userName = userData['Data']['data']['user']['username'].upper()
|
||||
leagueStats = await getLeagueStats(userData)
|
||||
message = ''
|
||||
if leagueStats['Played'] is False:
|
||||
message += f'用户 {userName} 没有排位统计数据'
|
||||
else:
|
||||
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
|
||||
message += f'用户 {userName} 暂未完成定级赛'
|
||||
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
|
||||
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
|
||||
else:
|
||||
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
|
||||
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
|
||||
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
|
||||
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
|
||||
if leagueStats["VS"] != 0:
|
||||
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
|
||||
if soloData['Status'] is False:
|
||||
return f'{message}\nSolo统计数据请求失败'
|
||||
sprintStats = await getSprintStats(soloData)
|
||||
blitzStats = await getBlitzStats(soloData)
|
||||
if sprintStats['Played'] is True:
|
||||
message += f'\n40L: {sprintStats["Time"]}s'
|
||||
if sprintStats['Rank'] is not False:
|
||||
message += f' ( #{sprintStats["Rank"]} )'
|
||||
if blitzStats['Played'] is True:
|
||||
message += f'\nBlitz: {blitzStats["Score"]}'
|
||||
if blitzStats['Rank'] is not False:
|
||||
message += f' ( #{blitzStats["Rank"]} )'
|
||||
return message
|
||||
@@ -1,70 +0,0 @@
|
||||
from re import match
|
||||
|
||||
|
||||
async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]:
|
||||
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
|
||||
'TOP': ['top绑定', 'topbind']}
|
||||
# 剔除命令前缀
|
||||
for i in _CMD_ALIASES[gameType]:
|
||||
if message.startswith(i):
|
||||
message = message.replace(i, '')
|
||||
message = message.strip()
|
||||
break
|
||||
if message == '' or message.isspace():
|
||||
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
|
||||
else:
|
||||
return await checkName(message, gameType)
|
||||
|
||||
|
||||
async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]:
|
||||
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
|
||||
'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'],
|
||||
'TOP': ['top查', 'topstats']}
|
||||
_ME = ['我', '自己', '私', '俺', 'me']
|
||||
message = (message.strip()).lower()
|
||||
# 剔除命令前缀
|
||||
for i in _CMD_ALIASES[gameType]:
|
||||
if message.startswith(i):
|
||||
message = message.replace(i, '')
|
||||
message = message.strip()
|
||||
break
|
||||
if message == '' or message.isspace():
|
||||
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
|
||||
else:
|
||||
if message.startswith('[cq:at,qq='):
|
||||
try:
|
||||
QQNumber = int((str(message)).split(
|
||||
'[cq:at,qq=')[1].split(']')[0])
|
||||
except ValueError:
|
||||
return {'Success': False, 'Type': None, 'Message': 'QQ号码不合法'}
|
||||
else:
|
||||
return {'Success': True, 'Type': 'AT', 'Message': None, 'QQNumber': QQNumber}
|
||||
elif message in _ME:
|
||||
return {'Success': True, 'Type': 'ME', 'Message': None}
|
||||
else:
|
||||
return await checkName(message, gameType)
|
||||
|
||||
|
||||
async def checkName(name: str, gameType: str) -> dict[str, bool | str]:
|
||||
if gameType == 'IO':
|
||||
if match(r'^[a-f0-9]{24}$', name):
|
||||
return {'Success': True, 'Type': 'ID', 'Message': None, 'User': name}
|
||||
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
|
||||
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
|
||||
else:
|
||||
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
|
||||
elif gameType == 'TOP':
|
||||
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
|
||||
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
|
||||
else:
|
||||
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
|
||||
elif gameType == 'TOS':
|
||||
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
|
||||
and name.isdigit() is False
|
||||
and 2 <= len(name) <= 18):
|
||||
# 虽然我也不想这么长 但是似乎确实得这么长
|
||||
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
|
||||
elif name.isdigit() is True:
|
||||
return {'Success': True, 'Type': 'QQ', 'Message': None, 'QQNumber': name}
|
||||
else:
|
||||
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
|
||||
@@ -1,152 +0,0 @@
|
||||
from nonebot.log import logger
|
||||
|
||||
from asyncio import gather
|
||||
import aiohttp
|
||||
|
||||
|
||||
async def request(Url: str) -> dict[str, bool | dict[str, any]]:
|
||||
# 封装请求函数
|
||||
data = {}
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(Url) as resp:
|
||||
data['Status'] = True
|
||||
data['Data'] = await resp.json()
|
||||
data['Success'] = data['Data']['success']
|
||||
except aiohttp.client_exceptions.ClientConnectorError as e:
|
||||
logger.error(f'[TETRIS STATS] TOSDataProcessing.request: 请求错误\n{e}')
|
||||
data['Status'] = False
|
||||
finally:
|
||||
return data
|
||||
|
||||
|
||||
async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool | dict[str, any]]:
|
||||
# 获取用户信息
|
||||
if userName is not None and teaID is None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
|
||||
userInfo = await request(Url=userDataUrl)
|
||||
elif teaID is not None and userName is None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
|
||||
userInfo = await request(Url=userDataUrl)
|
||||
else:
|
||||
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误')
|
||||
return userInfo
|
||||
|
||||
|
||||
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool | dict[str, any]]:
|
||||
# 获取用户数据
|
||||
if userName is not None and teaID is None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
|
||||
userData = await request(Url=userDataUrl)
|
||||
elif teaID is not None and userName is None:
|
||||
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
|
||||
userData = await request(Url=userDataUrl)
|
||||
else:
|
||||
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误')
|
||||
return userData
|
||||
|
||||
|
||||
async def getRankStats(userInfo: dict) -> dict[str, bool | float]:
|
||||
# 获取Rank数据
|
||||
rankStats = {}
|
||||
if int(userInfo['Data']['data']['rankedGames']) == 0:
|
||||
rankStats['Played'] = False
|
||||
else:
|
||||
rankStats['Played'] = True
|
||||
rankStats['Rating'] = round(
|
||||
float(userInfo['Data']['data']['ratingNow']), 2)
|
||||
rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2)
|
||||
rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3)
|
||||
return rankStats
|
||||
|
||||
|
||||
async def getGameData(userData: dict) -> dict[str, bool | int | float]:
|
||||
# 获取游戏数据
|
||||
gameData = {}
|
||||
if userData['Data']['data'] == []:
|
||||
gameData['Played'] = False
|
||||
else:
|
||||
gameData['Played'] = True
|
||||
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
|
||||
for i in userData['Data']['data']:
|
||||
# 排除单人局和时间为0的游戏
|
||||
if i['num_players'] == 1 or i['time'] == 0:
|
||||
continue
|
||||
# 茶:不计算没挖掘的局,即使apm和lpm也如此
|
||||
if i['dig'] is None:
|
||||
break
|
||||
# 加权计算
|
||||
time = i['time'] / 1000
|
||||
lpm = 24 * (i['pieces'] / time)
|
||||
apm = (i['attack'] / time) * 60
|
||||
adpm = ((i['attack'] + i['dig']) / time) * 60
|
||||
weightedTotalLpm += lpm * time
|
||||
weightedTotalApm += apm * time
|
||||
weightedTotalAdpm += adpm * time
|
||||
weightedTotalTime += time
|
||||
num += 1
|
||||
if num == 50:
|
||||
break
|
||||
gameData['NUM'] = num
|
||||
gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2)
|
||||
gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2)
|
||||
gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2)
|
||||
gameData['PPS'] = round((gameData['LPM'] / 24), 2)
|
||||
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
|
||||
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
|
||||
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
|
||||
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息
|
||||
return gameData
|
||||
|
||||
|
||||
async def getPBData(userInfo: dict) -> dict[str, bool | float | str]:
|
||||
# 获取PB数据
|
||||
PBData = {}
|
||||
if int(userInfo['Data']['data']['PBSprint']) == 2147483647:
|
||||
PBData['Sprint'] = False
|
||||
else:
|
||||
PBData['Sprint'] = round(
|
||||
float(userInfo['Data']['data']['PBSprint']) / 1000, 2)
|
||||
if int(userInfo['Data']['data']['PBMarathon']) == 0:
|
||||
PBData['Marathon'] = False
|
||||
else:
|
||||
PBData['Marathon'] = userInfo['Data']['data']['PBMarathon']
|
||||
if int(userInfo['Data']['data']['PBChallenge']) == 0:
|
||||
PBData['Challenge'] = False
|
||||
else:
|
||||
PBData['Challenge'] = userInfo['Data']['data']['PBChallenge']
|
||||
return PBData
|
||||
|
||||
|
||||
async def generateMessage(userName: str = None, teaID: int = None) -> str:
|
||||
# 生成消息
|
||||
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
|
||||
if userInfo['Status'] is False:
|
||||
return f'用户信息请求失败'
|
||||
if userInfo['Success'] is False:
|
||||
return f'用户信息请求错误:\n{userInfo["Data"]["error"]}'
|
||||
rankStats = await getRankStats(userInfo)
|
||||
PBData = await getPBData(userInfo)
|
||||
message = ''
|
||||
if rankStats['Played'] is False:
|
||||
message += f'用户 {userInfo["Data"]["data"]["name"]}({userInfo["Data"]["data"]["teaId"]})暂无段位统计数据'
|
||||
elif rankStats['Played'] is True:
|
||||
message += f'用户 {userInfo["Data"]["data"]["name"]} ({userInfo["Data"]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
|
||||
if userData['Status'] is False:
|
||||
message = f'{message.rstrip()}\n游戏数据请求失败'
|
||||
elif userData['Status'] is True:
|
||||
gameData = await getGameData(userData)
|
||||
if gameData['Played'] is False:
|
||||
message += ', 暂无游戏数据'
|
||||
elif gameData['Played'] is True:
|
||||
message += f', 最近 {gameData["NUM"]} 局数据'
|
||||
message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )'
|
||||
message += f'\nAPM:{gameData["APM"]} ( x{gameData["APL"]} )'
|
||||
message += f'\nADPM:{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )'
|
||||
if PBData['Sprint'] is not False:
|
||||
message += f'\n40L: {PBData["Sprint"]}s'
|
||||
if PBData['Marathon'] is not False:
|
||||
message += f'\nMarathon: {PBData["Marathon"]}'
|
||||
if PBData['Challenge'] is not False:
|
||||
message += f'\nChallenge: {PBData["Challenge"]}'
|
||||
return message
|
||||
79
nonebot_plugin_tetris_stats/Utils/MessageAnalyzer.py
Normal file
79
nonebot_plugin_tetris_stats/Utils/MessageAnalyzer.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from re import match, sub
|
||||
|
||||
|
||||
async def handleBindMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
|
||||
'''返回值为tuple[gameType, tuple[message, user]]'''
|
||||
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
|
||||
'TOP': ['top绑定', 'topbind']}
|
||||
# 剔除命令前缀
|
||||
for i in _CMD_ALIASES[gameType]:
|
||||
if match(rf'(?i){i}', message):
|
||||
message = sub(rf'(?i){i}', '', message)
|
||||
message = message.strip()
|
||||
break
|
||||
else:
|
||||
raise ValueError(
|
||||
'[TETRIS STATS] MessageAnalyzer.handleBindMessage: 预期外行为,请上报GitHub')
|
||||
if message == '' or message.isspace():
|
||||
return (None, ('用户名为空', None))
|
||||
else:
|
||||
return await checkName(message, gameType)
|
||||
|
||||
|
||||
async def handleStatsQueryMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
|
||||
'''返回值为tuple[gameType, tuple[message, user]]'''
|
||||
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
|
||||
'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'],
|
||||
'TOP': ['top查', 'topstats']}
|
||||
_ME = ['我', '自己', '我等', '卑人', '愚', '老身', '爷', '老娘', '本姑娘', '本大爷',
|
||||
'鄙人', '寡人', '小生', '贫僧', '本人', '孤', '吾', '俺', '咱', '私', 'me']
|
||||
# 剔除命令前缀
|
||||
for i in _CMD_ALIASES[gameType]:
|
||||
if match(rf'(?i){i}', message):
|
||||
message = sub(rf'(?i){i}', '', message)
|
||||
message = message.strip()
|
||||
break
|
||||
if message == '' or message.isspace():
|
||||
return (None, ('用户名为空', None))
|
||||
else:
|
||||
if message.startswith('[CQ:at,qq='):
|
||||
try:
|
||||
user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0])
|
||||
except ValueError:
|
||||
return (None, ('QQ号码不合法', None))
|
||||
else:
|
||||
return ('AT', (None, user))
|
||||
elif message in _ME:
|
||||
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
|
||||
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
|
||||
return ('ME', (None, None))
|
||||
else:
|
||||
return await checkName(message, gameType)
|
||||
|
||||
|
||||
async def checkName(name: str, gameType: str) -> tuple[str | None, tuple]:
|
||||
'''返回值为tuple[gameType, tuple[message, user]]'''
|
||||
if gameType == 'IO':
|
||||
if match(r'^[a-f0-9]{24}$', name):
|
||||
return ('ID', (None, name))
|
||||
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
|
||||
return ('Name', (None, name.lower()))
|
||||
else:
|
||||
return (None, ('用户名不合法', None))
|
||||
elif gameType == 'TOP':
|
||||
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
|
||||
return ('Name', (None, name))
|
||||
else:
|
||||
return (None, ('用户名不合法', None))
|
||||
elif gameType == 'TOS':
|
||||
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
|
||||
and name.isdigit() is False
|
||||
and 2 <= len(name) <= 18):
|
||||
# 虽然我也不想这么长 但是似乎确实得这么长
|
||||
return ('Name', (None, name))
|
||||
elif name.isdigit() is True:
|
||||
return ('QQ', (None, name))
|
||||
else:
|
||||
return (None, ('用户名不合法', None))
|
||||
else:
|
||||
return (None, ('游戏类型错误', None))
|
||||
17
nonebot_plugin_tetris_stats/Utils/Request.py
Normal file
17
nonebot_plugin_tetris_stats/Utils/Request.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from nonebot.log import logger
|
||||
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
|
||||
|
||||
async def request(Url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
# 封装请求函数
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(Url) as resp:
|
||||
data = await resp.json()
|
||||
return (True, data['success'], data)
|
||||
except aiohttp.client_exceptions.ClientConnectorError as e:
|
||||
logger.error(f'[TETRIS STATS] request.request: 请求错误\n{e}')
|
||||
return (False, False, {})
|
||||
@@ -3,13 +3,17 @@ from nonebot.log import logger
|
||||
import sqlite3
|
||||
import os
|
||||
|
||||
_DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db'
|
||||
_DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db'
|
||||
|
||||
|
||||
async def initDB():
|
||||
# 初始化数据库
|
||||
if not os.path.exists(os.path.dirname(_DB_FILE)):
|
||||
os.makedirs(os.path.dirname(_DB_FILE))
|
||||
if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径
|
||||
os.rename('data/nonebot-plugin-tetris-stats',
|
||||
os.path.dirname(_DB_FILE))
|
||||
else:
|
||||
os.makedirs(os.path.dirname(_DB_FILE))
|
||||
db = sqlite3.connect(_DB_FILE)
|
||||
cursor = db.cursor()
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
|
||||
@@ -23,7 +27,7 @@ async def initDB():
|
||||
logger.info('数据库初始化完成')
|
||||
|
||||
|
||||
async def queryBindInfo(QQNumber: int, gameType: str) -> dict:
|
||||
async def queryBindInfo(QQNumber: str | int, gameType: str) -> str | None:
|
||||
# 查询绑定信息
|
||||
db = sqlite3.connect(_DB_FILE)
|
||||
cursor = db.cursor()
|
||||
@@ -32,21 +36,21 @@ async def queryBindInfo(QQNumber: int, gameType: str) -> dict:
|
||||
db.commit()
|
||||
db.close()
|
||||
if user is None:
|
||||
return {'Hit': False, 'User': None}
|
||||
return None
|
||||
else:
|
||||
return {'Hit': True, 'User': user[0]}
|
||||
return user[0]
|
||||
|
||||
|
||||
async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str:
|
||||
async def writeBindInfo(QQNumber: str | int, user: str, gameType: str) -> str:
|
||||
# 写入绑定信息
|
||||
info = await queryBindInfo(QQNumber, gameType)
|
||||
bindInfo = await queryBindInfo(QQNumber, gameType)
|
||||
db = sqlite3.connect(_DB_FILE)
|
||||
cursor = db.cursor()
|
||||
if info['Hit'] is True:
|
||||
if bindInfo is not None:
|
||||
cursor.execute(
|
||||
f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber))
|
||||
message = '更新成功'
|
||||
elif info['Hit'] is False:
|
||||
elif bindInfo is None:
|
||||
cursor.execute(
|
||||
f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user))
|
||||
message = '绑定成功'
|
||||
1
nonebot_plugin_tetris_stats/Utils/__init__.py
Normal file
1
nonebot_plugin_tetris_stats/Utils/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from . import MessageAnalyzer, Request, SQL
|
||||
@@ -1,15 +1,9 @@
|
||||
from re import I
|
||||
from nonebot import get_driver
|
||||
|
||||
from nonebot import get_driver, on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from .Utils.SQL import initDB
|
||||
|
||||
from .MessageAnalyzer import *
|
||||
from .SQL import *
|
||||
from . import GameDataProcessor
|
||||
|
||||
from .IODataProcessing import generateMessage as IOgenerateMessage
|
||||
from .IODataProcessing import getUserID as IOgetUserID
|
||||
from .TOSDataProcessing import generateMessage as TOSgenerateMessage
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
@@ -17,76 +11,3 @@ driver = get_driver()
|
||||
@driver.on_startup
|
||||
async def startUP():
|
||||
await initDB()
|
||||
|
||||
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
|
||||
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
|
||||
|
||||
tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats',
|
||||
flags=I, permission=GROUP)
|
||||
|
||||
topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
|
||||
topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
|
||||
|
||||
|
||||
@ioBind.handle()
|
||||
async def bindIOUser(event: MessageEvent, matcher: Matcher):
|
||||
decodedMessage = await handleBindMessage(message=str(event.get_message()), gameType='IO')
|
||||
if decodedMessage['Success'] is True:
|
||||
if decodedMessage['Type'] == 'ID':
|
||||
user = decodedMessage['User']
|
||||
elif decodedMessage['Type'] == 'Name':
|
||||
user = await IOgetUserID(userName=decodedMessage['User'])
|
||||
message = await writeBindInfo(QQNumber=event.sender.user_id, user=user, gameType='IO')
|
||||
elif decodedMessage['Success'] is False:
|
||||
message = decodedMessage['Message']
|
||||
await matcher.send(message=message)
|
||||
|
||||
|
||||
@ioStats.handle()
|
||||
async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher):
|
||||
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='IO')
|
||||
if decodedMessage['Success'] is True:
|
||||
if decodedMessage['Type'] == 'AT':
|
||||
bindInfo = await queryBindInfo(QQNumber=decodedMessage['QQNumber'], gameType='IO')
|
||||
if bindInfo['Hit'] is True:
|
||||
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
|
||||
elif bindInfo['Hit'] is False:
|
||||
message = '未查询到绑定信息'
|
||||
elif decodedMessage['Type'] == 'ME':
|
||||
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
|
||||
if bindInfo['Hit'] is True:
|
||||
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
|
||||
elif bindInfo['Hit'] is False:
|
||||
message = '您还没有绑定账号'
|
||||
elif decodedMessage['Type'] == 'ID':
|
||||
message = await IOgenerateMessage(userID=decodedMessage['User'])
|
||||
elif decodedMessage['Type'] == 'Name':
|
||||
message = await IOgenerateMessage(userName=decodedMessage['User'])
|
||||
elif decodedMessage['Success'] is False:
|
||||
message = decodedMessage['Message']
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
@tosStats.handle()
|
||||
async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher):
|
||||
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='TOS')
|
||||
if decodedMessage['Success'] is True:
|
||||
if decodedMessage['Type'] == 'AT' or decodedMessage['Type'] == 'QQ':
|
||||
message = await TOSgenerateMessage(teaID=decodedMessage['QQNumber'])
|
||||
elif decodedMessage['Type'] == 'ME':
|
||||
message = await TOSgenerateMessage(teaID=event.sender.user_id)
|
||||
elif decodedMessage['Type'] == 'Name':
|
||||
message = await TOSgenerateMessage(userName=decodedMessage['User'])
|
||||
elif decodedMessage['Success'] is False:
|
||||
message = decodedMessage['Message']
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
@topBind.handle()
|
||||
async def bindTOPUser(event: MessageEvent, matcher: Matcher):
|
||||
await matcher.send(message='TODO')
|
||||
|
||||
|
||||
@topStats.handle()
|
||||
async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher):
|
||||
await matcher.send(message='TODO')
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "nonebot-plugin-tetris-stats"
|
||||
version = "0.1.1"
|
||||
version = "0.2.0"
|
||||
description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件"
|
||||
authors = ["scdhh <wallfjjd@gmail.com>"]
|
||||
readme = "README.md"
|
||||
|
||||
Reference in New Issue
Block a user