Compare commits

...

27 Commits
0.1.0 ... 0.3.0

Author SHA1 Message Date
scdhh
88ba7cd3af 引入pylint对代码进行检查
代码规范:使用PEP8推荐的命名规范
删除Request模块,改为每个游戏的processor单独实现,因为每个游戏的api请求都不太一样
对于io_data_processor:
1. 引入了playwright以应对api套的cloudflare五秒盾
对于top_data_processor:
1. 添加了lxml和pandas的stubs库,并修复了所有type hint错误
对于sql:
1. 使用全局变量保存数据库连接对象,理论上运行一次只会连接一次数据库
2. 移动初始化数据库的hook函数到sql.py
其他:
优化代码
版本推进
2022-08-06 00:56:05 +00:00
5807a8c5fc 更新依赖 2022-07-29 04:48:54 +08:00
18b0d0033c 删除空行 2022-07-29 04:18:07 +08:00
9478ce9c71 修复top绑定时错误处理不存在的用户名的bug 2022-07-29 04:17:04 +08:00
f336b725d0 删除.pop 2022-07-29 04:03:04 +08:00
1d4b24a22b 修复top错误判断用户存在的bug 2022-07-29 04:02:42 +08:00
92619a9692 添加一些自称 2022-07-29 03:46:36 +08:00
35f799ee89 添加TOP查询的支持 2022-07-29 03:46:27 +08:00
scdhh
8fdffca530 Merge pull request #2 from Richard969/main
添加一些自称
2022-07-28 04:27:55 +08:00
Richard969
fd6e2e2367 添加一些自称 2022-07-28 02:18:37 +08:00
8ed1c20feb 修复tos查询游戏数据时没有有效记录会爆炸的bug 2022-07-25 04:11:42 +08:00
e5abc99590 tos增加一种命令前缀 2022-07-25 04:10:23 +08:00
d6449ddf8c 删除不需要的import 2022-07-24 17:35:41 +08:00
dcbab47833 修复命令前缀大写不能正确识别的bug 2022-07-24 17:25:06 +08:00
scdhh
e65c50e107 Update README.md 2022-07-24 17:25:06 +08:00
4525b84fc4 修复查AT时不能正确识别的bug 2022-07-24 17:25:06 +08:00
6207cdb3d9 修复IO输入用户名时大写查询爆炸的bug 2022-07-24 17:25:06 +08:00
f1291a9923 修复所有type hint(当时写的时候mypy炸了
使用神秘新架构
独立request模块
版本推进
fixed #1 [BUG] 茶服无法查询用户名内带有大写字母的用户
2022-07-24 17:24:37 +08:00
scdhh
0d2b37e78a Create codeql-analysis.yml 2022-05-26 07:38:53 +08:00
scdhh
a8998b01a7 Create LICENSE 2022-05-26 07:35:48 +08:00
df6cb71e3f 添加了对查bot自身的情况的特判 2022-05-25 18:00:09 +08:00
2a19b9fe4c 修复IO绑定用户时 用户不存在导致报错和错误绑定的bug 2022-05-25 14:27:27 +08:00
a77e190cf7 版本推进 2022-05-24 19:01:58 +08:00
c9e7923243 格式化代码 2022-05-24 18:58:19 +08:00
1569e71da9 更新README 2022-05-24 18:49:08 +08:00
da4c3c01a6 移除依赖项nb-cli,并添加nonebot2作为新依赖 2022-05-24 18:48:32 +08:00
9786a1325b 添加开发依赖autopep8 2022-05-24 18:42:55 +08:00
19 changed files with 1830 additions and 1069 deletions

72
.github/workflows/codeql-analysis.yml vendored Normal file
View File

@@ -0,0 +1,72 @@
# For most projects, this workflow file will not need changing; you simply need
# to commit it to your repository.
#
# You may wish to alter this file to override the set of languages analyzed,
# or to provide custom queries or build logic.
#
# ******** NOTE ********
# We have attempted to detect the languages in your repository. Please check
# the `language` matrix defined below to confirm you have the correct set of
# supported CodeQL languages.
#
name: "CodeQL"
on:
push:
branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
branches: [ main ]
schedule:
- cron: '17 6 * * 5'
jobs:
analyze:
name: Analyze
runs-on: ubuntu-latest
permissions:
actions: read
contents: read
security-events: write
strategy:
fail-fast: false
matrix:
language: [ 'python' ]
# CodeQL supports [ 'cpp', 'csharp', 'go', 'java', 'javascript', 'python', 'ruby' ]
# Learn more about CodeQL language support at https://aka.ms/codeql-docs/language-support
steps:
- name: Checkout repository
uses: actions/checkout@v3
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL
uses: github/codeql-action/init@v2
with:
languages: ${{ matrix.language }}
# If you wish to specify custom queries, you can do so here or in a config file.
# By default, queries listed here will override any specified in a config file.
# Prefix the list here with "+" to use these queries and those in the config file.
# Details on CodeQL's query packs refer to : https://docs.github.com/en/code-security/code-scanning/automatically-scanning-your-code-for-vulnerabilities-and-errors/configuring-code-scanning#using-queries-in-ql-packs
# queries: security-extended,security-and-quality
# Autobuild attempts to build any compiled languages (C/C++, C#, or Java).
# If this step fails, then you should remove it and run the build manually (see below)
- name: Autobuild
uses: github/codeql-action/autobuild@v2
# Command-line programs to run using the OS shell.
# 📚 See https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsrun
# If the Autobuild fails above, remove it and uncomment the following three lines.
# modify them (or add more) to build your code if your project, please refer to the EXAMPLE below for guidance.
# - run: |
# echo "Run, Build Application using script"
# ./location_of_script_within_repo/buildscript.sh
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

3
.gitignore vendored
View File

@@ -1 +1,4 @@
dist
test*
Untitled*
*copy*

31
.pylintrc Normal file
View File

@@ -0,0 +1,31 @@
[MAIN]
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-allow-list=lxml
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code. (This is an alternative name to extension-pkg-allow-list
# for backward compatibility.)
extension-pkg-whitelist=lxml
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=ujson
disable=
C0114, # missing-module-docstring
[MESSAGES CONTROL]
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=c-extension-no-member
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=120

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2022 scdhh
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -5,18 +5,31 @@ TETRIS Stats
目前支持
* [TETR.IO](https://tetr.io/)
* [茶服](https://teatube.cn/tos/)
计划支持
* [TOP](http://tetrisonline.pl/)
安装
----
* 使用 pip
* 使用 nb-cli
```
nb plugin install nonebot-plugin-tetris-stats
```
* 使用 pip
```
pip install nonebot-plugin-tetris-stats
```
* 对于 Windows
```
# CMD or PowerShell
playwright install firefox
```
* 对于 Linux
```
# 似乎 playwright官方 只支持 Ubuntu, 如果你是其他系统请自行尝试解决依赖问题
playwright install firefox
playwright install-deps firefox
```
使用
----
@@ -33,6 +46,7 @@ pip install nonebot-plugin-tetris-stats
* [NoneBot2](https://v2.nonebot.dev/)
* [OneBot](https://onebot.dev/)
* [go-cqhttp](https://github.com/Mrs4s/go-cqhttp/)
开源
----

View File

@@ -1,137 +0,0 @@
from nonebot.log import logger
from asyncio import gather
import aiohttp
# 封装请求函数
async def request(Url: str) -> dict[str, bool|dict[str, any]]:
data = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data['Status'] = True
data['Data'] = await resp.json()
data['Success'] = data['Data']['success']
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] IODataProcessing.request: 请求错误\n{e}')
data['Status'] = False
finally:
return data
# 获取用户数据
async def getUserData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
if userName is not None and userID is None:
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
userData = await request(Url=userDataUrl)
elif userName is None and userID is not None:
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
userData = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserData: 参数错误')
return userData
# 获取Solo数据
async def getSoloData(userName: str = None, userID: str = None) -> dict[str, dict[str, any]]:
if userName is not None and userID is None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
soloData = await request(Url = userSoloUrl)
elif userName is None and userID is not None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
soloData = await request(Url = userSoloUrl)
else:
raise ValueError('[TETRIS STATS] IODataProcessing.getSoloData: 参数错误')
return soloData
# 获取用户ID
async def getUserID(userData: dict = None, userName: str = None) -> str:
if userName is not None and userData is None:
userData = await getUserData(userName=userName)
elif userData is None and userName is None:
raise ValueError('[TETRIS STATS] IODataProcessing.getUserID: 参数错误')
return userData['Data']['data']['user']['_id']
# 获取排位统计数据
async def getLeagueStats(userData: dict) -> dict[str, bool|int|str|float]:
league = userData['Data']['data']['user']['league']
leagueStats = {}
if league['gamesplayed'] == 0:
leagueStats['Played'] = False
else:
leagueStats['Played'] = True
leagueStats['PPS'] = league['pps']
leagueStats['APM'] = league['apm']
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] != -1:
leagueStats['Ranked'] = True
leagueStats['Rating'] = round(league['rating'], 2)
leagueStats['Glicko'] = round(league['glicko'], 2)
leagueStats['RD'] = round(league['rd'], 2)
else:
leagueStats['Ranked'] = False
leagueStats['Standing'] = league['standing']
leagueStats['LPM'] = round((league['pps'] * 24), 2)
leagueStats['APL'] = round((leagueStats['APM'] / leagueStats['LPM']), 2)
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
leagueStats['ADPL'] = round((leagueStats['ADPM'] / leagueStats['LPM']), 2)
return leagueStats
# 获取40L统计数据
async def getSprintStats(soloData: dict) -> dict[str, bool|int|float]:
sprintStats = {}
if soloData['Data']['data']['records']['40l']['record'] is None:
sprintStats['Played'] = False
else:
sprintStats['Played'] = True
sprintStats['Rank'] = False if soloData['Data']['data']['records']['40l']['rank'] is None else soloData['Data']['data']['records']['40l']['rank']
sprintStats['Time'] = round(soloData['Data']['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
return sprintStats
# 获取Blitz统计数据
async def getBlitzStats(soloData: dict) -> dict[str, bool|int]:
blitzStats = {}
if soloData['Data']['data']['records']['blitz']['record'] is None:
blitzStats['Played'] = False
else:
blitzStats['Played'] = True
blitzStats['Rank'] = False if soloData['Data']['data']['records']['blitz']['rank'] is None else soloData['Data']['data']['records']['blitz']['rank']
blitzStats['Score'] = soloData['Data']['data']['records']['blitz']['record']['endcontext']['score']
return blitzStats
# 生成消息
async def generateMessage(userName: str = None, userID: str = None) -> str:
userData, soloData = await gather(getUserData(userName = userName, userID = userID), getSoloData(userName = userName, userID = userID))
if userData['Status'] is False:
return '用户信息请求失败'
if userData['Success'] is False:
return f'用户信息请求错误:\n{userData["Data"]["error"]}'
userName = userData['Data']['data']['user']['username'].upper()
leagueStats = await getLeagueStats(userData)
message = ''
if leagueStats['Played'] is False:
message += f'用户 {userName} 没有排位统计数据'
else:
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
message += f'用户 {userName} 暂未完成定级赛'
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
else:
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
if leagueStats["VS"] != 0:
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
if soloData['Status'] is False:
return f'{message}\nSolo统计数据请求失败'
sprintStats = await getSprintStats(soloData)
blitzStats = await getBlitzStats(soloData)
if sprintStats['Played'] is True:
message += f'\n40L: {sprintStats["Time"]}s'
if sprintStats['Rank'] is not False:
message += f' ( #{sprintStats["Rank"]} )'
if blitzStats['Played'] is True:
message += f'\nBlitz: {blitzStats["Score"]}'
if blitzStats['Rank'] is not False:
message += f' ( #{blitzStats["Rank"]} )'
return message

View File

@@ -1,69 +0,0 @@
from re import match
# userBind
async def handleBindMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
return await checkName(message, gameType)
# statsQuery
async def handleStatsQueryMessage(message: str, gameType: str) -> dict[str, bool | str]:
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_ME = ['', '自己', '', '', 'me']
message = (message.strip()).lower()
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if message.startswith(i):
message = message.replace(i, '')
message = message.strip()
break
if message == '' or message.isspace():
return {'Success': False, 'Type': None, 'Message': '用户名为空'}
else:
if message.startswith('[cq:at,qq='):
try:
QQNumber = int((str(message)).split(
'[cq:at,qq=')[1].split(']')[0])
except ValueError:
return {'Success': False, 'Type': None, 'Message': 'QQ号码不合法'}
else:
return {'Success': True, 'Type': 'AT', 'Message': None, 'QQNumber': QQNumber}
elif message in _ME:
return {'Success': True, 'Type': 'ME', 'Message': None}
else:
return await checkName(message, gameType)
async def checkName(name: str, gameType: str) -> dict[str, bool | str]:
if gameType == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return {'Success': True, 'Type': 'ID', 'Message': None, 'User': name}
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}
elif gameType == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return {'Success': True, 'Type': 'Name', 'Message': None, 'User': name}
elif name.isdigit() is True:
return {'Success': True, 'Type': 'QQ', 'Message': None, 'QQNumber': name}
else:
return {'Success': False, 'Type': None, 'Message': '用户名不合法'}

View File

@@ -1,52 +0,0 @@
from nonebot.log import logger
import sqlite3
import os
_DB_FILE = 'data/nonebot-plugin-tetris-stats/data.db'
# 初始化数据库
async def initDB():
if not os.path.exists(os.path.dirname(_DB_FILE)):
os.makedirs(os.path.dirname(_DB_FILE))
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
db.commit()
db.close()
logger.info('数据库初始化完成')
# 查询绑定信息
async def queryBindInfo(QQNumber: int, gameType: str) -> dict:
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
cursor.execute(f'SELECT USER FROM {gameType}BIND WHERE QQ = {QQNumber}')
user = cursor.fetchone()
db.commit()
db.close()
if user is None:
return {'Hit': False, 'User': None}
else:
return {'Hit': True, 'User': user[0]}
# 写入绑定信息
async def writeBindInfo(QQNumber: int, user: str, gameType: str) -> str:
info = await queryBindInfo(QQNumber, gameType)
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
if info['Hit'] is True:
cursor.execute(
f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber))
message = '更新成功'
elif info['Hit'] is False:
cursor.execute(
f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user))
message = '绑定成功'
db.commit()
db.close()
return message

View File

@@ -1,143 +0,0 @@
from nonebot.log import logger
from asyncio import gather
import aiohttp
# 封装请求函数
async def request(Url: str) -> dict[str, bool|dict[str, any]]:
data = {}
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data['Status'] = True
data['Data'] = await resp.json()
data['Success'] = data['Data']['success']
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] TOSDataProcessing.request: 请求错误\n{e}')
data['Status'] = False
finally:
return data
# 获取用户信息
async def getUserInfo(userName: str = None, teaID: int = None) -> dict[str, bool|dict[str, any]]:
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
userInfo = await request(Url=userDataUrl)
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
userInfo = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserInfo: 参数错误')
return userInfo
# 获取用户数据
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> dict[str, bool|dict[str, any]]:
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
userData = await request(Url=userDataUrl)
elif teaID is not None and userName is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
userData = await request(Url=userDataUrl)
else:
raise ValueError('[TETRIS STATS] TOSDataProcessing.getUserData: 参数错误')
return userData
# 获取Rank数据
async def getRankStats(userInfo: dict) -> dict[str, bool|float]:
rankStats = {}
if int(userInfo['Data']['data']['rankedGames']) == 0:
rankStats['Played'] = False
else:
rankStats['Played'] = True
rankStats['Rating'] = round(float(userInfo['Data']['data']['ratingNow']), 2)
rankStats['RD'] = round(float(userInfo['Data']['data']['rdNow']), 2)
rankStats['Vol'] = round(float(userInfo['Data']['data']['volNow']), 3)
return rankStats
# 获取游戏数据
async def getGameData(userData: dict) -> dict[str, bool|int|float]:
gameData = {}
if userData['Data']['data'] == []:
gameData['Played'] = False
else:
gameData['Played'] = True
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
for i in userData['Data']['data']:
# 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0:
continue
# 茶不计算没挖掘的局即使apm和lpm也如此
if i['dig'] is None:
break
# 加权计算
time = i['time'] / 1000
lpm = 24 * (i['pieces'] / time)
apm = (i['attack'] / time) * 60
adpm = ((i['attack'] + i['dig']) / time) * 60
weightedTotalLpm += lpm * time
weightedTotalApm += apm * time
weightedTotalAdpm += adpm * time
weightedTotalTime += time
num += 1
if num == 50:
break
gameData['NUM'] = num
gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2)
gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2)
gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2)
gameData['PPS'] = round((gameData['LPM'] / 24), 2)
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['Data']['data']内有50个局, 则继续往前获取信息
return gameData
# 获取PB数据
async def getPBData(userInfo: dict) -> dict[str, bool|float|str]:
PBData = {}
if int(userInfo['Data']['data']['PBSprint']) == 2147483647:
PBData['Sprint'] = False
else:
PBData['Sprint'] = round(float(userInfo['Data']['data']['PBSprint']) / 1000, 2)
if int(userInfo['Data']['data']['PBMarathon']) == 0:
PBData['Marathon'] = False
else:
PBData['Marathon'] = userInfo['Data']['data']['PBMarathon']
if int(userInfo['Data']['data']['PBChallenge']) == 0:
PBData['Challenge'] = False
else:
PBData['Challenge'] = userInfo['Data']['data']['PBChallenge']
return PBData
# 生成消息
async def generateMessage(userName: str = None, teaID: int = None) -> str:
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
if userInfo['Status'] is False:
return f'用户信息请求失败'
if userInfo['Success'] is False:
return f'用户信息请求错误:\n{userInfo["Data"]["error"]}'
rankStats = await getRankStats(userInfo)
PBData = await getPBData(userInfo)
message = ''
if rankStats['Played'] is False:
message += f'用户 {userInfo["Data"]["data"]["name"]}{userInfo["Data"]["data"]["teaId"]})暂无段位统计数据'
elif rankStats['Played'] is True:
message += f'用户 {userInfo["Data"]["data"]["name"]} ({userInfo["Data"]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
if userData['Status'] is False:
message = f'{message.rstrip()}\n游戏数据请求失败'
elif userData['Status'] is True:
gameData = await getGameData(userData)
if gameData['Played'] is False:
message += ', 暂无游戏数据'
elif gameData['Played'] is True:
message += f', 最近 {gameData["NUM"]} 局数据'
message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )'
message += f'\nAPM{gameData["APM"]} ( x{gameData["APL"]} )'
message += f'\nADPM{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )'
if PBData['Sprint'] is not False:
message += f'\n40L: {PBData["Sprint"]}s'
if PBData['Marathon']is not False:
message += f'\nMarathon: {PBData["Marathon"]}'
if PBData['Challenge'] is not False:
message += f'\nChallenge: {PBData["Challenge"]}'
return message

View File

@@ -1,86 +1 @@
from re import I
from nonebot import get_driver, on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from .MessageAnalyzer import *
from .SQL import *
from .IODataProcessing import generateMessage as IOgenerateMessage
from .IODataProcessing import getUserID as IOgetUserID
from .TOSDataProcessing import generateMessage as TOSgenerateMessage
driver = get_driver()
@driver.on_startup
async def startUP():
await initDB()
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
tosStats = on_regex(pattern=r'^tos查|^tostats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
topBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
topStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
@ioBind.handle()
async def bindIOUser(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleBindMessage(message=str(event.get_message()), gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'ID':
user = decodedMessage['User']
elif decodedMessage['Type'] == 'Name':
user = await IOgetUserID(userName=decodedMessage['User'])
message = await writeBindInfo(QQNumber=event.sender.user_id, user=user, gameType='IO')
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.send(message=message)
@ioStats.handle()
async def handleIOStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='IO')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT':
bindInfo = await queryBindInfo(QQNumber=decodedMessage['QQNumber'], gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '未查询到绑定信息'
elif decodedMessage['Type'] == 'ME':
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
if bindInfo['Hit'] is True:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await IOgenerateMessage(userID=bindInfo["User"])}')
elif bindInfo['Hit'] is False:
message = '您还没有绑定账号'
elif decodedMessage['Type'] == 'ID':
message = await IOgenerateMessage(userID=decodedMessage['User'])
elif decodedMessage['Type'] == 'Name':
message = await IOgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@tosStats.handle()
async def handleTOSStatsQuery(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=str(event.get_message()), gameType='TOS')
if decodedMessage['Success'] is True:
if decodedMessage['Type'] == 'AT' or decodedMessage['Type'] == 'QQ':
message = await TOSgenerateMessage(teaID=decodedMessage['QQNumber'])
elif decodedMessage['Type'] == 'ME':
message = await TOSgenerateMessage(teaID=event.sender.user_id)
elif decodedMessage['Type'] == 'Name':
message = await TOSgenerateMessage(userName=decodedMessage['User'])
elif decodedMessage['Success'] is False:
message = decodedMessage['Message']
await matcher.finish(message=message)
@topBind.handle()
async def bindTOPUser(event: MessageEvent, matcher: Matcher):
await matcher.send(message='TODO')
@topStats.handle()
async def handleTOPStatsQuery(event: MessageEvent, matcher: Matcher):
await matcher.send(message='TODO')
from . import game_data_processor

View File

@@ -0,0 +1 @@
from . import io_data_processor, top_data_processor, tos_data_processor

View File

@@ -0,0 +1,262 @@
from typing import Any
from asyncio import gather
from re import I
from playwright.async_api import Browser, async_playwright
from ujson import loads, JSONDecodeError
import aiohttp
from nonebot import on_regex, get_driver
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from ..utils.message_analyzer import handle_bind_message, handle_stats_query_message
from ..utils.sql import query_bind_info, write_bind_info
_BROWSER: Browser | None = None
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
driver = get_driver()
@IOBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_bind_message(message=event.raw_message, game_type='IO')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
if decoded_message[0] == 'ID':
user_id_stats = await check_user_id(user_id=decoded_message[1][1])
if user_id_stats[0] is False:
await matcher.finish(user_id_stats[1])
else:
user_id = decoded_message[1][1]
elif decoded_message[0] == 'Name':
user_data = await get_user_data(user_name=decoded_message[1][1])
if user_data[0] is False:
await matcher.finish('用户信息请求失败')
elif user_data[1] is False:
await matcher.finish(f'用户信息请求错误:\n{user_data[2]["error"]}')
else:
user_id = await get_user_id(user_data=user_data[2])
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(await write_bind_info(qq_number=event.sender.user_id,
user=user_id,
game_type='IO'))
@IOStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='IO')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT':
if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息')
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
elif decoded_message[0] == 'ME':
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败, 请联系bot主人')
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='IO')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
elif decoded_message[0] == 'ID':
message = await generate_message(user_id=decoded_message[1][1])
elif decoded_message[0] == 'Name':
message = await generate_message(user_name=decoded_message[1][1])
await matcher.finish(message=message)
@driver.on_shutdown
async def _():
if isinstance(_BROWSER, Browser):
await _BROWSER.close()
async def init_playwright() -> Browser:
'''初始化playwright'''
global _BROWSER
p = await async_playwright().start()
_BROWSER = await p.firefox.launch()
return _BROWSER
async def get_browser() -> Browser:
'''获取浏览器对象'''
return _BROWSER or await init_playwright()
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
'''请求api'''
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return (True, data['success'], data)
except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(f'请求错误\n{error}')
return (False, False, {})
except aiohttp.client_exceptions.ContentTypeError:
# 如果有五秒盾就用firefox硬穿
browser = await get_browser()
page = await browser.new_page()
await page.goto(url)
attempts = 0
while True:
text = await page.locator("body").text_content()
if text is None:
continue
attempts += 1
if await page.title() == 'Please Wait... | Cloudflare':
break
try:
data = loads(text)
except JSONDecodeError:
await page.wait_for_timeout(1000)
else:
return (True, data['success'], data)
if attempts >= 60:
break
await page.close()
return (True, False, {'error': '绕过五秒盾失败'})
async def get_user_data(user_name: str = None,
user_id: str = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户数据'''
if user_name is not None and user_id is None:
user_data_url = f'https://ch.tetr.io/api/users/{user_name}'
elif user_name is None and user_id is not None:
user_data_url = f'https://ch.tetr.io/api/users/{user_id}'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await request(url=user_data_url)
async def get_solo_data(user_name: str = None,
user_id: str = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取Solo数据'''
if user_name is not None and user_id is None:
user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records'
elif user_name is None and user_id is not None:
user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await request(url=user_solo_url)
async def get_user_id(user_data: dict) -> str:
'''获取用户ID'''
return user_data['data']['user']['_id']
async def check_user_id(user_id: str) -> tuple[bool, str]:
'''检查用户ID是否有效'''
user_data = await get_user_data(user_id=user_id)
if user_data[0] is False:
return (False, '用户信息请求失败')
if user_data[1] is False:
return (False, f'用户信息请求错误:\n{user_data[2]["error"]}')
if user_id == user_data[2]['data']['user']['_id']:
return (True, '')
raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下x')
async def get_league_stats(user_data: dict) -> dict[str, Any]:
'''获取排位统计数据'''
league = user_data['data']['user']['league']
league_stats: dict[str, Any] = {}
if league['gamesplayed'] != 0:
league_stats['PPS'] = league['pps']
league_stats['APM'] = league['apm']
league_stats['VS'] = 0 if league['vs'] is None else league['vs']
league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] == -1:
league_stats['Rank'] = None
else:
league_stats['Rating'] = round(league['rating'], 2)
league_stats['Glicko'] = round(league['glicko'], 2)
league_stats['RD'] = round(league['rd'], 2)
league_stats['Standing'] = league['standing']
league_stats['LPM'] = round((league['pps'] * 24), 2)
league_stats['APL'] = round(
(league_stats['APM'] / league_stats['LPM']), 2)
league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2)
league_stats['ADPL'] = round(
(league_stats['ADPM'] / league_stats['LPM']), 2)
return league_stats
async def get_sprint_stats(solo_data: dict) -> dict[str, Any]:
'''获取40L统计数据'''
sprint_stats = {}
solo = solo_data['data']['records']['40l']
if solo['record'] is not None:
sprint_stats['Time'] = round(
solo['record']['endcontext']['finalTime'] / 1000, 2)
if solo['rank'] is not None:
sprint_stats['Rank'] = solo['rank']
return sprint_stats
async def get_blitz_stats(solo_data: dict) -> dict[str, Any]:
'''获取Blitz统计数据'''
blitz_stats = {}
blitz = solo_data['data']['records']['blitz']
if blitz['record'] is not None:
blitz_stats['Score'] = blitz['record']['endcontext']['score']
if blitz['rank'] is not None:
blitz_stats['Rank'] = blitz['rank']
return blitz_stats
async def generate_message(user_name: str = None, user_id: str = None) -> str:
'''生成消息'''
user_data, solo_data = await gather(get_user_data(user_name=user_name, user_id=user_id),
get_solo_data(user_name=user_name, user_id=user_id))
if user_data[0] is False:
return '用户信息请求失败'
if user_data[1] is False:
return f'用户信息请求错误:\n{user_data[2]["error"]}'
user_name = user_data[2]['data']['user']['username'].upper()
league_stats = await get_league_stats(user_data[2])
message = ''
if not league_stats:
message += f'用户 {user_name} 没有排位统计数据'
else:
if league_stats['Rank'] is None:
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
else:
if league_stats['Rank'] == 'Z':
message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR'
else:
message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})'
message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:'
message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )'
message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )'
if league_stats["VS"] != 0:
message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )'
if solo_data[0] is False:
return f'{message}\nSolo统计数据请求失败'
if solo_data[1] is False:
return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}'
sprint_stats, blitz_stats = await gather(get_sprint_stats(solo_data[2]),
get_blitz_stats(solo_data[2]))
message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else ''
message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''
message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else ''
message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else ''
return message

View File

@@ -0,0 +1,182 @@
from typing import Any
from re import I
from lxml import etree
from pandas import read_html
import aiohttp
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from ..utils.message_analyzer import handle_bind_message, handle_stats_query_message
from ..utils.sql import query_bind_info, write_bind_info
TOPBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
TopStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
@TOPBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_bind_message(message=event.raw_message, game_type='TOP')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'Name':
user_data = await get_user_data(decoded_message[1][1])
if user_data[0] is False:
await matcher.finish('用户信息请求失败')
else:
if await check_user(user_data[1]) is False:
await matcher.finish('用户不存在')
user_name = await get_user_name(user_data[1])
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(await write_bind_info(qq_number=event.sender.user_id, user=user_name, game_type='TOP'))
@TopStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='TOP')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT':
if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息')
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
elif decoded_message[0] == 'ME':
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败, 请联系bot主人')
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='TOP')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
elif decoded_message[0] == 'Name':
message = await generate_message(decoded_message[1][1])
await matcher.finish(message=message)
async def get_user_data(user_name: str) -> tuple[bool, str]:
'''获取用户信息'''
url = f'http://tetrisonline.pl/top/profile.php?user={user_name}'
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return (True, await resp.text())
except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(error)
return (False, '')
async def check_user(user_data: str) -> bool:
'''如果用户存在返回True, 如果用户不存在返回False'''
return user_data.find('user not found!') == -1
async def get_user_name(user_data: str) -> str:
'''获取用户名'''
data = etree.HTML(user_data).xpath('//div[@class="mycontent"]/h1/text()')
if isinstance(data, list):
return str(data[0]).replace('\'s profile', '')
raise TypeError('预期外行为, 请上报GitHub')
async def get_game_stats(user_data: str) -> dict[str, dict[str, Any]]:
'''获取游戏统计数据'''
game_stats: dict[str, Any] = {'24H': {}, 'All': {}}
html = etree.HTML(user_data)
data = html.xpath('//div[@class="mycontent"]//text()')
if isinstance(data, list):
for i in data:
if not isinstance(i, str):
i = str(i)
i = i.strip()
if i.startswith('lpm:'):
game_stats['24H']['LPM'] = i.replace('lpm:', '').strip()
if i.startswith('apm:'):
game_stats['24H']['APM'] = i.replace('apm:', '').strip()
if 'LPM' in game_stats['24H'] and 'APM' in game_stats['24H']:
break
else:
raise TypeError('预期外行为, 请上报GitHub')
# 如果没有24H统计数据
if game_stats['24H'].get('LPM') in [None, ''] or game_stats['24H'].get('APM') in [None, '']:
game_stats['24H'].pop('LPM', None)
game_stats['24H'].pop('APM', None)
else:
game_stats['24H']['PPS'] = round(
float(game_stats['24H']['LPM']) / 24, 2)
game_stats['24H']['APL'] = round(
float(game_stats['24H']['APM']) / float(game_stats['24H']['LPM']), 2)
game_stats['24H']['LPM'] = round(float(game_stats['24H']['LPM']), 2)
game_stats['24H']['APM'] = round(float(game_stats['24H']['APM']), 2)
table = html.xpath('//table')
if isinstance(table, list):
if isinstance(table[0], etree._Element):
stats_table = etree.tostring(table[0], encoding='utf-8').decode()
df = read_html(stats_table, encoding='utf-8', header=0)[0]
results = df.T.to_dict().values()
if results:
game_stats['All']['LPM'] = 0
game_stats['All']['APM'] = 0
for i in results:
if isinstance(i, dict):
game_stats['All']['LPM'] += i['lpm']
game_stats['All']['APM'] += i['apm']
game_stats['All']['LPM'] = game_stats['All']['LPM'] / \
len(results)
game_stats['All']['APM'] = game_stats['All']['APM'] / \
len(results)
game_stats['All']['PPS'] = round(
game_stats['All']['LPM'] / 24, 2)
game_stats['All']['APL'] = round(
float(game_stats['All']['APM']) / float(game_stats['All']['LPM']), 2)
game_stats['All']['LPM'] = round(
float(game_stats['All']['LPM']), 2)
game_stats['All']['APM'] = round(
float(game_stats['All']['APM']), 2)
else:
raise TypeError('预期外行为, 请上报GitHub')
return game_stats
async def generate_message(user_name: str) -> str:
'''生成消息'''
user_data = await get_user_data(user_name)
if user_data[0] is False:
return '用户信息请求失败'
if await check_user(user_data[1]) is False:
return '用户不存在'
user_name = await get_user_name(user_data[1])
game_stats = await get_game_stats(user_data[1])
message = ''
if game_stats['24H'] and game_stats['All']:
message += f'用户 {user_name} 24小时内统计数据为: '
message += f'\nL\'PM: {game_stats["24H"]["LPM"]} ( {game_stats["24H"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["24H"]["APM"]} ( x{game_stats["24H"]["APL"]} )'
message += '\n历史统计数据为: '
message += f'\nL\'PM: {game_stats["All"]["LPM"]} ( {game_stats["All"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["All"]["APM"]} ( x{game_stats["All"]["APL"]} )'
elif game_stats['24H'] and not game_stats['All']:
message += f'用户 {user_name} 24小时内统计数据为: '
message += f'\nL\'PM: {game_stats["24H"]["LPM"]} ( {game_stats["24H"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["24H"]["APM"]} ( x{game_stats["24H"]["APL"]} )'
message += '\n暂无历史统计数据'
message += '\n( 这理论上不该存在, 如果你看到了, 请联系bot主人查看后台'
logger.error(f'老实说这个不算Error, 但是理论上不应该有, 如果你看到了这条日志, 我希望你能来Github发个issue\
user_name: {user_name}\
user_data: {user_data}\
game_stats: {game_stats}')
elif not game_stats['24H'] and game_stats['All']:
message += f'用户 {user_name} 暂无24小时内统计数据, 历史统计数据为: '
message += f'\nL\'PM: {game_stats["All"]["LPM"]} ( {game_stats["All"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["All"]["APM"]} ( x{game_stats["All"]["APL"]} )'
else:
message += f'用户 {user_name} 暂无24小时内统计数据, 暂无历史统计数据'
return message

View File

@@ -0,0 +1,164 @@
from typing import Any
from asyncio import gather
from re import I
import aiohttp
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from ..utils.message_analyzer import handle_stats_query_message
TOSStats = on_regex(pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
@TOSStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='TOS')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT' or decoded_message[0] == 'QQ':
if decoded_message[1][1] == event.self_id:
await matcher.finish(message='不能查询bot的信息')
message = await generate_message(tea_id=decoded_message[1][1])
elif decoded_message[0] == 'ME':
message = await generate_message(tea_id=event.sender.user_id)
elif decoded_message[0] == 'Name':
message = await generate_message(user_name=decoded_message[1][1])
await matcher.finish(message=message)
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
'''请求api'''
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return (True, data['success'], data)
except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(f'请求错误\n{error}')
return (False, False, {})
async def get_user_info(user_name: str = None,
tea_id: int = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户信息'''
if user_name is not None and tea_id is None:
user_data_url = f'https://teatube.cn:8888/getUsernameInfo?username={user_name}'
elif user_name is None and tea_id is not None:
user_data_url = f'https://teatube.cn:8888/getTeaIdInfo?teaId={tea_id}'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await request(url=user_data_url)
async def get_user_data(user_name: str = None,
tea_id: int = None,
other_parameter: str = ''
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户数据'''
if user_name is not None and tea_id is None:
user_data_url = f'https://teatube.cn:8888/getProfile?id={user_name}{other_parameter}'
elif user_name is None and tea_id is not None:
user_data_url = f'https://teatube.cn:8888/getProfile?id={tea_id}{other_parameter}'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await request(url=user_data_url)
async def get_rank_stats(user_info: dict) -> dict[str, float]:
'''获取Rank数据'''
data = user_info['data']
if int(data['rankedGames']) != 0:
rank_stats = {}
rank_stats['Rating'] = round(float(data['ratingNow']), 2)
rank_stats['RD'] = round(float(data['rdNow']), 2)
rank_stats['Vol'] = round(float(data['volNow']), 3)
return rank_stats
async def get_game_data(user_data: dict) -> dict[str, int | float]:
'''获取游戏数据'''
if user_data['data'] != []:
game_data: dict[str, int | float] = {}
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = num = 0
for i in user_data['data']:
# 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0:
continue
# 茶:不计算没挖掘的局, 即使apm和lpm也如此
if i['dig'] is None:
continue
# 加权计算
time = i['time'] / 1000
lpm = 24 * (i['pieces'] / time)
apm = (i['attack'] / time) * 60
adpm = ((i['attack'] + i['dig']) / time) * 60
weighted_total_lpm += lpm * time
weighted_total_apm += apm * time
weighted_total_adpm += adpm * time
total_time += time
num += 1
if num == 50:
break
if num > 0:
game_data['NUM'] = num
game_data['LPM'] = round((weighted_total_lpm / total_time), 2)
game_data['APM'] = round((weighted_total_apm / total_time), 2)
game_data['ADPM'] = round((weighted_total_adpm / total_time), 2)
game_data['PPS'] = round((game_data['LPM'] / 24), 2)
game_data['APL'] = round((game_data['APM'] / game_data['LPM']), 2)
game_data['ADPL'] = round(
(game_data['ADPM'] / game_data['LPM']), 2)
game_data['VS'] = round((game_data['ADPM'] / 60 * 100), 2)
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息
return game_data
async def get_pb_data(user_info: dict) -> dict[str, float | str]:
'''获取PB数据'''
pb_data: dict[str, float | str] = {}
data = user_info['data']
if int(data['PBSprint']) != 2147483647:
pb_data['Sprint'] = round(float(data['PBSprint']) / 1000, 2)
if int(data['PBMarathon']) != 0:
pb_data['Marathon'] = data['PBMarathon']
if int(data['PBChallenge']) != 0:
pb_data['Challenge'] = data['PBChallenge']
return pb_data
async def generate_message(user_name: str = None, tea_id: int = None) -> str:
'''生成消息'''
user_info, user_data = await gather(get_user_info(user_name=user_name, tea_id=tea_id),
get_user_data(user_name=user_name, tea_id=tea_id))
if user_info[0] is False:
return '用户信息请求失败'
if user_info[1] is False:
return f'用户信息请求错误:\n{user_info[2]["error"]}'
rank_stats, pb_data = await gather(get_rank_stats(user_info[2]), get_pb_data(user_info[2]))
message = f'用户 {user_info[2]["data"]["name"]} ({user_info[2]["data"]["teaId"]}) '
if not rank_stats:
message += '暂无段位统计数据'
else:
message += f', 段位分 {rank_stats["Rating"]}±{rank_stats["RD"]} ({rank_stats["Vol"]}) '
if user_data[0] is False:
message = f'{message.rstrip()}\n游戏数据请求失败'
elif user_data[1] is False:
message = f'{message.rstrip()}\n游戏数据请求错误:\n{user_data[2]["error"]}'
else:
game_data = await get_game_data(user_data[2])
if not game_data:
message += ', 暂无游戏数据'
else:
message += f', 最近 {game_data["NUM"]} 局数据'
message += f'\nL\'PM: {game_data["LPM"]} ( {game_data["PPS"]} pps )'
message += f'\nAPM{game_data["APM"]} ( x{game_data["APL"]} )'
message += f'\nADPM{game_data["ADPM"]} ( x{game_data["ADPL"]} ) ( {game_data["VS"]}vs )'
message += f'\n40L: {pb_data["Sprint"]}s' if 'Sprint' in pb_data else ''
message += f'\nMarathon: {pb_data["Marathon"]}' if 'Marathon' in pb_data else ''
message += f'\nChallenge: {pb_data["Challenge"]}' if 'Challenge' in pb_data else ''
return message

View File

@@ -0,0 +1 @@
from . import message_analyzer, sql

View File

@@ -0,0 +1,75 @@
from re import match, sub
async def handle_bind_message(message: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_cmd_aliases = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _cmd_aliases[game_type]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
else:
raise ValueError('预期外行为, 请上报GitHub')
if message == '' or message.isspace():
return (None, ('用户名为空', None))
return await check_name(message, game_type)
async def handle_stats_query_message(message: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_cmd_aliases = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_me = ['', '自己', '我等', '卑人', '', '老身', '', '老娘', '本姑娘', '本大爷', '鄙人', '寡人',
'小生', '贫僧', '本人', '', '', '', '', '', 'me', '洒家', '在下', '', '人家',
'本小姐', '老夫', '老子', '', '本尊', '', '拙者', '', '', '自分', '吾輩', '我輩', '',
'己等', '俺等', '此方', '', '', '劳资', '本宝宝', '', '本喵', 'watashi', 'i', 'myself',
'self', 'oneself']
# 剔除命令前缀
for i in _cmd_aliases[game_type]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
if message == '' or message.isspace():
return (None, ('用户名为空', None))
if message.startswith('[CQ:at,qq='):
try:
user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0])
except ValueError:
return (None, ('QQ号码不合法', None))
else:
return ('AT', (None, user))
elif message in _me:
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
return ('ME', (None, None))
else:
return await check_name(message, game_type)
async def check_name(name: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
if game_type == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return ('ID', (None, name))
if match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return ('Name', (None, name.lower()))
return (None, ('用户名不合法', None))
if game_type == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return ('Name', (None, name))
return (None, ('用户名不合法', None))
if game_type == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return ('Name', (None, name))
if name.isdigit() is True:
return ('QQ', (None, name))
return (None, ('用户名不合法', None))
return (None, ('游戏类型错误', None))

View File

@@ -0,0 +1,79 @@
from sqlite3 import connect, Connection
import os
from nonebot import get_driver
from nonebot.log import logger
_DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db'
_DB: Connection | None = None
driver = get_driver()
@driver.on_startup
async def _():
'''初始化数据库'''
await init_db()
@driver.on_shutdown
async def _():
if isinstance(_DB, Connection):
await _DB.close()
async def init_db() -> Connection:
'''初始化数据库'''
if not os.path.exists(os.path.dirname(_DB_FILE)):
if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径
os.rename('data/nonebot-plugin-tetris-stats',
os.path.dirname(_DB_FILE))
else:
os.makedirs(os.path.dirname(_DB_FILE))
global _DB
_DB = connect(_DB_FILE)
cursor = _DB.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
_DB.commit()
logger.info('数据库初始化完成')
return _DB
async def get_db() -> Connection:
'''获取数据库对象'''
return _DB or await init_db()
async def query_bind_info(qq_number: str | int, game_type: str) -> str | None:
'''查询绑定信息'''
db = await get_db()
cursor = db.cursor()
cursor.execute(f'SELECT USER FROM {game_type}BIND WHERE QQ = {qq_number}')
user = cursor.fetchone()
db.commit()
if user is None:
return None
return user[0]
async def write_bind_info(qq_number: str | int, user: str, game_type: str) -> str:
'''写入绑定信息'''
bind_info = await query_bind_info(qq_number, game_type)
db = await get_db()
cursor = db.cursor()
if bind_info is not None:
cursor.execute(
f'UPDATE {game_type}BIND SET USER = ? WHERE QQ = ?', (user, qq_number))
message = '更新成功'
elif bind_info is None:
cursor.execute(
f'INSERT INTO {game_type}BIND (QQ, USER) VALUES (?, ?)', (qq_number, user))
message = '绑定成功'
db.commit()
return message

1483
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "nonebot-plugin-tetris-stats"
version = "0.1.0"
version = "0.3.0"
description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件"
authors = ["scdhh <wallfjjd@gmail.com>"]
readme = "README.md"
@@ -9,14 +9,23 @@ repository = "https://github.com/shoucandanghehe/nonebot-plugin-tetris-stats"
license = "MIT"
[tool.poetry.dependencies]
python = "^3.10"
nb-cli = "^0.6.7"
python = "^3.10,<3.11"
nonebot-adapter-onebot = "^2.0.0-beta.1"
aiohttp = "^3.8.1"
asyncio = "^3.4.3"
nonebot2 = "^2.0.0-beta.3"
lxml = "^4.9.1"
pandas = "^1.4.3"
playwright = "^1.24.1"
ujson = "^5.4.0"
[tool.poetry.dev-dependencies]
mypy = "^0.950"
mypy = "^0.971"
autopep8 = "^1.6.0"
pylint = "^2.14.5"
types-ujson = "^5.4.0"
lxml-stubs = "^0.4.0"
pandas-stubs = "^1.4.3"
[build-system]
requires = ["poetry-core>=1.0.0"]