Compare commits

...

25 Commits
0.2.0 ... 0.3.2

Author SHA1 Message Date
61b671354b 版本推进 2022-08-10 14:11:17 +08:00
3f24dc2f1d Add Release CI 2022-08-10 14:10:13 +08:00
scdhh
c63bb52540 Merge pull request #4 from shoucandanghehe/dependabot/pip/autopep8-1.7.0
Bump autopep8 from 1.6.0 to 1.7.0
2022-08-10 04:37:38 +08:00
dependabot[bot]
a0bc307aa6 Bump autopep8 from 1.6.0 to 1.7.0
Bumps [autopep8](https://github.com/hhatto/autopep8) from 1.6.0 to 1.7.0.
- [Release notes](https://github.com/hhatto/autopep8/releases)
- [Commits](https://github.com/hhatto/autopep8/compare/v1.6.0...v1.7.0)

---
updated-dependencies:
- dependency-name: autopep8
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-08-09 15:50:44 +00:00
d3258c2bb7 整理代码 2022-08-09 02:50:06 +08:00
b320a23c77 将数据库操作封装成类,并添加数据库路径的config选项 2022-08-09 02:12:29 +08:00
69e4cb97e3 import config 2022-08-09 01:39:45 +08:00
c6f83230c8 修复写入缓存时 传递了错误的变量的bug 2022-08-08 13:44:16 +08:00
e473dca4df 对于io_data_processor:
1.将请求相关封装成了Request类
2.新增了获取cookies后使用aiohttp请求的机制,理论上可以提速+减少不必要的性能开销,并且将cookies存储至cache
3.整理代码
新增config.py,用于配置cache路径
添加依赖项Brotli
2022-08-08 13:32:31 +08:00
scdhh
68028cf3d9 Merge pull request #3 from shoucandanghehe/dependabot/pip/pandas-stubs-1.4.3.220807
Bump pandas-stubs from 1.4.3.220801 to 1.4.3.220807
2022-08-08 13:22:22 +08:00
dependabot[bot]
10929bab03 Bump pandas-stubs from 1.4.3.220801 to 1.4.3.220807
Bumps [pandas-stubs](https://github.com/pandas-dev/pandas-stubs) from 1.4.3.220801 to 1.4.3.220807.
- [Release notes](https://github.com/pandas-dev/pandas-stubs/releases)
- [Changelog](https://github.com/pandas-dev/pandas-stubs/blob/main/docs/release_procedure.md)
- [Commits](https://github.com/pandas-dev/pandas-stubs/compare/v1.4.3.220801...v1.4.3.220807)

---
updated-dependencies:
- dependency-name: pandas-stubs
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2022-08-08 01:27:02 +00:00
scdhh
8e5e7d3c33 Create dependabot.yml 2022-08-08 08:56:40 +08:00
ec04af21d6 版本推进 2022-08-07 00:43:31 +08:00
2aef281e8c 修复正确请求到api后没有正确关闭浏览器标签页导致内存溢出的bug 2022-08-07 00:42:59 +08:00
2be62140ac 修复关闭数据库时错误使用await的错误 2022-08-07 00:39:42 +08:00
scdhh
88ba7cd3af 引入pylint对代码进行检查
代码规范:使用PEP8推荐的命名规范
删除Request模块,改为每个游戏的processor单独实现,因为每个游戏的api请求都不太一样
对于io_data_processor:
1. 引入了playwright以应对api套的cloudflare五秒盾
对于top_data_processor:
1. 添加了lxml和pandas的stubs库,并修复了所有type hint错误
对于sql:
1. 使用全局变量保存数据库连接对象,理论上运行一次只会连接一次数据库
2. 移动初始化数据库的hook函数到sql.py
其他:
优化代码
版本推进
2022-08-06 00:56:05 +00:00
5807a8c5fc 更新依赖 2022-07-29 04:48:54 +08:00
18b0d0033c 删除空行 2022-07-29 04:18:07 +08:00
9478ce9c71 修复top绑定时错误处理不存在的用户名的bug 2022-07-29 04:17:04 +08:00
f336b725d0 删除.pop 2022-07-29 04:03:04 +08:00
1d4b24a22b 修复top错误判断用户存在的bug 2022-07-29 04:02:42 +08:00
92619a9692 添加一些自称 2022-07-29 03:46:36 +08:00
35f799ee89 添加TOP查询的支持 2022-07-29 03:46:27 +08:00
scdhh
8fdffca530 Merge pull request #2 from Richard969/main
添加一些自称
2022-07-28 04:27:55 +08:00
Richard969
fd6e2e2367 添加一些自称 2022-07-28 02:18:37 +08:00
22 changed files with 2030 additions and 833 deletions

11
.github/dependabot.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "daily"

25
.github/workflows/Release.yml vendored Normal file
View File

@@ -0,0 +1,25 @@
name: Release CI
on:
push:
tags:
- "*"
jobs:
release:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Poetry
shell: bash
run: |
pip install poetry
- name: Build
shell: bash
run: |
poetry install
poetry env use python
poetry publish --build -u ${{ secrets.USERNAME }} -p ${{ secrets.PASSWORD }} -n

31
.pylintrc Normal file
View File

@@ -0,0 +1,31 @@
[MAIN]
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code.
extension-pkg-allow-list=lxml, pydantic
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code. (This is an alternative name to extension-pkg-allow-list
# for backward compatibility.)
extension-pkg-whitelist=lxml, pydantic
# List of module names for which member attributes should not be checked
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis). It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=ujson
disable=
C0114, # missing-module-docstring
[MESSAGES CONTROL]
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
# multiple time (only on the command line, not in the configuration file where
# it should appear only once). See also the "--disable" option for examples.
enable=c-extension-no-member
[FORMAT]
# Maximum number of characters on a single line.
max-line-length=120

View File

@@ -5,24 +5,30 @@ TETRIS Stats
目前支持
* [TETR.IO](https://tetr.io/)
* [茶服](https://teatube.cn/tos/)
计划支持
* [TOP](http://tetrisonline.pl/)
安装
----
* 使用 nb-cli(推荐)
* 使用 nb-cli
```
nb plugin install nonebot-plugin-tetris-stats
```
* 使用 pip(不推荐)
* 使用 pip
```
pip install nonebot-plugin-tetris-stats
# 修改bot.py
```
* 对于 Windows
```
# CMD or PowerShell
playwright install firefox
```
* 对于 Linux
```
# 似乎 playwright官方 只支持 Ubuntu, 如果你是其他系统请自行尝试解决依赖问题
playwright install firefox
playwright install-deps firefox
```
使用

View File

@@ -1,211 +0,0 @@
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from typing import Any, Mapping
from asyncio import gather
from re import I
from ..Utils.Request import request
from ..Utils.MessageAnalyzer import handleBindMessage, handleStatsQueryMessage
from ..Utils.SQL import queryBindInfo, writeBindInfo
ioBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
ioStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
@ioBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleBindMessage(message=event.raw_message, gameType='IO')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
if decodedMessage[0] == 'ID':
userIDStats = await checkUserID(userID=decodedMessage[1][1])
if userIDStats[0] is False:
await matcher.finish(userIDStats[1])
else:
userID = decodedMessage[1][1]
elif decodedMessage[0] == 'Name':
userData = await getUserData(userName=decodedMessage[1][1])
if userData[0] is False:
await matcher.finish('用户信息请求失败')
elif userData[1] is False:
await matcher.finish(f'用户信息请求错误:\n{userData[2]["error"]}')
else:
userID = await getUserID(userData=userData[2])
if event.sender.user_id is None: # 理论上是不会有None出现的ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(await writeBindInfo(QQNumber=event.sender.user_id, user=userID, gameType='IO'))
@ioStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='IO')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
elif decodedMessage[0] == 'AT':
if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息')
bindInfo = await queryBindInfo(QQNumber=decodedMessage[1][1], gameType='IO')
if bindInfo is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
elif decodedMessage[0] == 'ME':
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败请联系bot主人')
bindInfo = await queryBindInfo(QQNumber=event.sender.user_id, gameType='IO')
if bindInfo is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息,不能保证查询到的用户为本人\n{await generateMessage(userID=bindInfo)}')
elif decodedMessage[0] == 'ID':
message = await generateMessage(userID=decodedMessage[1][1])
elif decodedMessage[0] == 'Name':
message = await generateMessage(userName=decodedMessage[1][1])
await matcher.finish(message=message)
async def getUserData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取用户数据
if userName is not None and userID is None:
userDataUrl = f'https://ch.tetr.io/api/users/{userName}'
elif userName is None and userID is not None:
userDataUrl = f'https://ch.tetr.io/api/users/{userID}'
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.getUserData: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getSoloData(userName: str = None, userID: str = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取Solo数据
if userName is not None and userID is None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userName}/records'
elif userName is None and userID is not None:
userSoloUrl = f'https://ch.tetr.io/api/users/{userID}/records'
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.getSoloData: 预期外行为请上报GitHub')
return await request(Url=userSoloUrl)
async def getUserID(userData: dict) -> str:
return userData['data']['user']['_id']
async def checkUserID(userID: str) -> tuple[bool, str]:
userData = await getUserData(userID=userID)
if userData[0] is False:
return (False, '用户信息请求失败')
elif userData[1] is False:
return (False, f'用户信息请求错误:\n{userData[2]["error"]}')
elif userID == userData[2]['data']['user']['_id']:
return (True, '')
else:
raise ValueError(
'[TETRIS STATS] IODataProcessing.checkUserID: 服务器返回的userID和用户提供的不一致这种情况理论上不应该发生以防万一还是写一下x')
async def getLeagueStats(userData: dict) -> dict[str, Any]:
# 获取排位统计数据
league = userData['data']['user']['league']
leagueStats: dict[str, Any] = {}
if league['gamesplayed'] == 0:
leagueStats['Played'] = False
else:
leagueStats['Played'] = True
leagueStats['PPS'] = league['pps']
leagueStats['APM'] = league['apm']
leagueStats['VS'] = 0 if league['vs'] is None else league['vs']
leagueStats['Rank'] = False if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] != -1:
leagueStats['Ranked'] = True
leagueStats['Rating'] = round(league['rating'], 2)
leagueStats['Glicko'] = round(league['glicko'], 2)
leagueStats['RD'] = round(league['rd'], 2)
else:
leagueStats['Ranked'] = False
leagueStats['Standing'] = league['standing']
leagueStats['LPM'] = round((league['pps'] * 24), 2)
leagueStats['APL'] = round(
(leagueStats['APM'] / leagueStats['LPM']), 2)
leagueStats['ADPM'] = round((leagueStats['VS'] * 0.6), 2)
leagueStats['ADPL'] = round(
(leagueStats['ADPM'] / leagueStats['LPM']), 2)
return leagueStats
async def getSprintStats(soloData: dict) -> Mapping[str, bool | int | float]:
# 获取40L统计数据
sprintStats = {}
if soloData['data']['records']['40l']['record'] is None:
sprintStats['Played'] = False
else:
sprintStats['Played'] = True
if soloData['data']['records']['40l']['rank'] is None:
sprintStats['Rank'] = False
else:
sprintStats['Rank'] = soloData['data']['records']['40l']['rank']
sprintStats['Time'] = round(
soloData['data']['records']['40l']['record']['endcontext']['finalTime'] / 1000, 2)
return sprintStats
async def getBlitzStats(soloData: dict) -> dict[str, Any]:
# 获取Blitz统计数据
blitzStats = {}
if soloData['data']['records']['blitz']['record'] is None:
blitzStats['Played'] = False
else:
blitzStats['Played'] = True
if soloData['data']['records']['blitz']['rank'] is None:
blitzStats['Rank'] = False
else:
blitzStats['Rank'] = soloData['data']['records']['blitz']['rank']
blitzStats['Score'] = soloData['data']['records']['blitz']['record']['endcontext']['score']
return blitzStats
async def generateMessage(userName: str = None, userID: str = None) -> str:
# 生成消息
userData, soloData = await gather(getUserData(userName=userName, userID=userID), getSoloData(userName=userName, userID=userID))
if userData[0] is False:
return '用户信息请求失败'
elif userData[1] is False:
return f'用户信息请求错误:\n{userData[2]["error"]}'
userName = userData[2]['data']['user']['username'].upper()
leagueStats = await getLeagueStats(userData[2])
message = ''
if leagueStats['Played'] is False:
message += f'用户 {userName} 没有排位统计数据'
else:
if leagueStats['Rank'] is False and leagueStats['Ranked'] is False:
message += f'用户 {userName} 暂未完成定级赛'
elif leagueStats['Rank'] is False and leagueStats['Ranked'] is True:
message += f'用户 {userName} 暂无段位, {leagueStats["Rating"]} TR'
else:
message += f'{leagueStats["Rank"]} 段用户 {userName} {leagueStats["Rating"]} TR (#{leagueStats["Standing"]})'
message += f', 段位分 {leagueStats["Glicko"]}±{leagueStats["RD"]}, 最近十场的数据:' if leagueStats['Ranked'] is True else ', 最近十场的数据:'
message += f'\nL\'PM: {leagueStats["LPM"]} ( {leagueStats["PPS"]} pps )'
message += f'\nAPM: {leagueStats["APM"]} ( x{leagueStats["APL"]} )'
if leagueStats["VS"] != 0:
message += f'\nADPM: {leagueStats["ADPM"]} ( x{leagueStats["ADPL"]} ) ( {leagueStats["VS"]}vs )'
if soloData[0] is False:
return f'{message}\nSolo统计数据请求失败'
elif soloData[1] is False:
return f'{message}\nSolo统计数据请求错误:\n{soloData[2]["error"]}'
sprintStats, blitzStats = await gather(getSprintStats(soloData[2]), getBlitzStats(soloData[2]))
if sprintStats['Played'] is True:
message += f'\n40L: {sprintStats["Time"]}s'
if sprintStats['Rank'] is not False:
message += f' ( #{sprintStats["Rank"]} )'
if blitzStats['Played'] is True:
message += f'\nBlitz: {blitzStats["Score"]}'
if blitzStats['Rank'] is not False:
message += f' ( #{blitzStats["Rank"]} )'
return message

View File

@@ -1,164 +0,0 @@
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from typing import Any
from asyncio import gather
from re import I
from ..Utils.Request import request
from ..Utils.MessageAnalyzer import handleStatsQueryMessage
tosStats = on_regex(pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
flags=I, permission=GROUP)
@tosStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decodedMessage = await handleStatsQueryMessage(message=event.raw_message, gameType='TOS')
if decodedMessage[0] is None:
await matcher.finish(decodedMessage[1][0])
elif decodedMessage[0] == 'AT' or decodedMessage[0] == 'QQ':
if decodedMessage[1][1] == event.self_id:
await matcher.finish(message='不能查询bot的信息')
message = await generateMessage(teaID=decodedMessage[1][1])
elif decodedMessage[0] == 'ME':
message = await generateMessage(teaID=event.sender.user_id)
elif decodedMessage[0] == 'Name':
message = await generateMessage(userName=decodedMessage[1][1])
await matcher.finish(message=message)
async def getUserInfo(userName: str = None, teaID: int = None) -> tuple[bool, bool, dict[str, Any]]:
# 获取用户信息
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getUsernameInfo?username={userName}'
elif userName is None and teaID is not None:
userDataUrl = f'https://teatube.cn:8888/getTeaIdInfo?teaId={teaID}'
else:
raise ValueError(
'[TETRIS STATS] TOSDataProcessing.getUserInfo: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getUserData(userName: str = None, teaID: int = None, otherParameter: str = '') -> tuple[bool, bool, dict[str, Any]]:
# 获取用户数据
if userName is not None and teaID is None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={userName}{otherParameter}'
elif userName is None and teaID is not None:
userDataUrl = f'https://teatube.cn:8888/getProfile?id={teaID}{otherParameter}'
else:
raise ValueError(
'[TETRIS STATS] TOSDataProcessing.getUserData: 预期外行为请上报GitHub')
return await request(Url=userDataUrl)
async def getRankStats(userInfo: dict) -> dict[str, bool | float]:
# 获取Rank数据
rankStats: dict[str, bool | float] = {}
if int(userInfo['data']['rankedGames']) == 0:
rankStats['Played'] = False
else:
rankStats['Played'] = True
rankStats['Rating'] = round(
float(userInfo['data']['ratingNow']), 2)
rankStats['RD'] = round(float(userInfo['data']['rdNow']), 2)
rankStats['Vol'] = round(float(userInfo['data']['volNow']), 3)
return rankStats
async def getGameData(userData: dict) -> dict[str, bool | int | float]:
# 获取游戏数据
gameData: dict[str, bool | int | float] = {}
if userData['data'] == []:
gameData['Played'] = False
else:
gameData['Played'] = True
weightedTotalLpm = weightedTotalApm = weightedTotalAdpm = weightedTotalTime = num = 0
for i in userData['data']:
# 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0:
continue
# 茶不计算没挖掘的局即使apm和lpm也如此
if i['dig'] is None:
continue
# 加权计算
time = i['time'] / 1000
lpm = 24 * (i['pieces'] / time)
apm = (i['attack'] / time) * 60
adpm = ((i['attack'] + i['dig']) / time) * 60
weightedTotalLpm += lpm * time
weightedTotalApm += apm * time
weightedTotalAdpm += adpm * time
weightedTotalTime += time
num += 1
if num == 50:
break
if num > 0:
gameData['NUM'] = num
gameData['LPM'] = round((weightedTotalLpm / weightedTotalTime), 2)
gameData['APM'] = round((weightedTotalApm / weightedTotalTime), 2)
gameData['ADPM'] = round((weightedTotalAdpm / weightedTotalTime), 2)
gameData['PPS'] = round((gameData['LPM'] / 24), 2)
gameData['APL'] = round((gameData['APM'] / gameData['LPM']), 2)
gameData['ADPL'] = round((gameData['ADPM'] / gameData['LPM']), 2)
gameData['VS'] = round((gameData['ADPM'] / 60 * 100), 2)
else:
gameData['Played'] = False
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息
return gameData
async def getPBData(userInfo: dict) -> dict[str, bool | float | str]:
# 获取PB数据
PBData: dict[str, bool | float | str] = {}
if int(userInfo['data']['PBSprint']) == 2147483647:
PBData['Sprint'] = False
else:
PBData['Sprint'] = round(
float(userInfo['data']['PBSprint']) / 1000, 2)
if int(userInfo['data']['PBMarathon']) == 0:
PBData['Marathon'] = False
else:
PBData['Marathon'] = userInfo['data']['PBMarathon']
if int(userInfo['data']['PBChallenge']) == 0:
PBData['Challenge'] = False
else:
PBData['Challenge'] = userInfo['data']['PBChallenge']
return PBData
async def generateMessage(userName: str = None, teaID: int = None) -> str:
# 生成消息
userInfo, userData = await gather(getUserInfo(userName=userName, teaID=teaID), getUserData(userName=userName, teaID=teaID))
if userInfo[0] is False:
return f'用户信息请求失败'
elif userInfo[1] is False:
return f'用户信息请求错误:\n{userInfo[2]["error"]}'
rankStats, PBData = await gather(getRankStats(userInfo[2]), getPBData(userInfo[2]))
message = ''
if rankStats['Played'] is False:
message += f'用户 {userInfo[2]["data"]["name"]}{userInfo[2]["data"]["teaId"]})暂无段位统计数据'
elif rankStats['Played'] is True:
message += f'用户 {userInfo[2]["data"]["name"]} ({userInfo[2]["data"]["teaId"]}) , 段位分 {rankStats["Rating"]}±{rankStats["RD"]} ({rankStats["Vol"]}) '
if userData[0] is False:
message = f'{message.rstrip()}\n游戏数据请求失败'
elif userData[1] is False:
message = f'{message.rstrip()}\n游戏数据请求错误:\n{userData[2]["error"]}'
else:
gameData = await getGameData(userData[2])
if gameData['Played'] is False:
message += ', 暂无游戏数据'
elif gameData['Played'] is True:
message += f', 最近 {gameData["NUM"]} 局数据'
message += f'\nL\'PM: {gameData["LPM"]} ( {gameData["PPS"]} pps )'
message += f'\nAPM{gameData["APM"]} ( x{gameData["APL"]} )'
message += f'\nADPM{gameData["ADPM"]} ( x{gameData["ADPL"]} ) ( {gameData["VS"]}vs )'
if PBData['Sprint'] is not False:
message += f'\n40L: {PBData["Sprint"]}s'
if PBData['Marathon'] is not False:
message += f'\nMarathon: {PBData["Marathon"]}'
if PBData['Challenge'] is not False:
message += f'\nChallenge: {PBData["Challenge"]}'
return message

View File

@@ -1 +0,0 @@
from . import IODataProcessor, TOSDataProcessor

View File

@@ -1,79 +0,0 @@
from re import match, sub
async def handleBindMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_CMD_ALIASES = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
else:
raise ValueError(
'[TETRIS STATS] MessageAnalyzer.handleBindMessage: 预期外行为请上报GitHub')
if message == '' or message.isspace():
return (None, ('用户名为空', None))
else:
return await checkName(message, gameType)
async def handleStatsQueryMessage(message: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_CMD_ALIASES = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_ME = ['', '自己', '我等', '卑人', '', '老身', '', '老娘', '本姑娘', '本大爷',
'鄙人', '寡人', '小生', '贫僧', '本人', '', '', '', '', '', 'me']
# 剔除命令前缀
for i in _CMD_ALIASES[gameType]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
if message == '' or message.isspace():
return (None, ('用户名为空', None))
else:
if message.startswith('[CQ:at,qq='):
try:
user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0])
except ValueError:
return (None, ('QQ号码不合法', None))
else:
return ('AT', (None, user))
elif message in _ME:
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
return ('ME', (None, None))
else:
return await checkName(message, gameType)
async def checkName(name: str, gameType: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
if gameType == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return ('ID', (None, name))
elif match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return ('Name', (None, name.lower()))
else:
return (None, ('用户名不合法', None))
elif gameType == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return ('Name', (None, name))
else:
return (None, ('用户名不合法', None))
elif gameType == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
return ('Name', (None, name))
elif name.isdigit() is True:
return ('QQ', (None, name))
else:
return (None, ('用户名不合法', None))
else:
return (None, ('游戏类型错误', None))

View File

@@ -1,17 +0,0 @@
from nonebot.log import logger
from typing import Any
import aiohttp
async def request(Url: str) -> tuple[bool, bool, dict[str, Any]]:
# 封装请求函数
try:
async with aiohttp.ClientSession() as session:
async with session.get(Url) as resp:
data = await resp.json()
return (True, data['success'], data)
except aiohttp.client_exceptions.ClientConnectorError as e:
logger.error(f'[TETRIS STATS] request.request: 请求错误\n{e}')
return (False, False, {})

View File

@@ -1,59 +0,0 @@
from nonebot.log import logger
import sqlite3
import os
_DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db'
async def initDB():
# 初始化数据库
if not os.path.exists(os.path.dirname(_DB_FILE)):
if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径
os.rename('data/nonebot-plugin-tetris-stats',
os.path.dirname(_DB_FILE))
else:
os.makedirs(os.path.dirname(_DB_FILE))
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
db.commit()
db.close()
logger.info('数据库初始化完成')
async def queryBindInfo(QQNumber: str | int, gameType: str) -> str | None:
# 查询绑定信息
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
cursor.execute(f'SELECT USER FROM {gameType}BIND WHERE QQ = {QQNumber}')
user = cursor.fetchone()
db.commit()
db.close()
if user is None:
return None
else:
return user[0]
async def writeBindInfo(QQNumber: str | int, user: str, gameType: str) -> str:
# 写入绑定信息
bindInfo = await queryBindInfo(QQNumber, gameType)
db = sqlite3.connect(_DB_FILE)
cursor = db.cursor()
if bindInfo is not None:
cursor.execute(
f'UPDATE {gameType}BIND SET USER = ? WHERE QQ = ?', (user, QQNumber))
message = '更新成功'
elif bindInfo is None:
cursor.execute(
f'INSERT INTO {gameType}BIND (QQ, USER) VALUES (?, ?)', (QQNumber, user))
message = '绑定成功'
db.commit()
db.close()
return message

View File

@@ -1 +0,0 @@
from . import MessageAnalyzer, Request, SQL

View File

@@ -1,13 +1 @@
from nonebot import get_driver
from .Utils.SQL import initDB
from . import GameDataProcessor
driver = get_driver()
@driver.on_startup
async def startUP():
await initDB()
from . import game_data_processor

View File

@@ -0,0 +1 @@
from . import io_data_processor, top_data_processor, tos_data_processor

View File

@@ -0,0 +1,360 @@
import os
from asyncio import gather
from re import I
from typing import Any
import aiohttp
from nonebot import get_driver, on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.log import logger
from nonebot.matcher import Matcher
from playwright.async_api import Browser, Response, async_playwright
from ujson import JSONDecodeError, dumps, loads
from ..utils.config import Config
from ..utils.database import DataBase
from ..utils.message_analyzer import (
handle_bind_message,
handle_stats_query_message
)
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
driver = get_driver()
config = Config.parse_obj(get_driver().config)
@IOBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_bind_message(message=event.raw_message, game_type='IO')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
if decoded_message[0] == 'ID':
user_id_stats = await check_user_id(decoded_message[1][1])
if user_id_stats[0] is False:
await matcher.finish(user_id_stats[1])
else:
user_id = decoded_message[1][1]
elif decoded_message[0] == 'Name':
user_data = await get_user_data(user_name=decoded_message[1][1])
if user_data[0] is False:
await matcher.finish('用户信息请求失败')
elif user_data[1] is False:
await matcher.finish(f'用户信息请求错误:\n{user_data[2]["error"]}')
else:
user_id = await get_user_id(user_data[2])
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(
await DataBase.write_bind_info(
qq_number=event.sender.user_id,
user=user_id,
game_type='IO'
)
)
@IOStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='IO')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT':
if event.is_tome() is True:
await matcher.finish('不能查询bot的信息')
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
elif decoded_message[0] == 'ME':
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败, 请联系bot主人')
bind_info = await DataBase.query_bind_info(qq_number=event.sender.user_id, game_type='IO')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
elif decoded_message[0] == 'ID':
message = await generate_message(user_id=decoded_message[1][1])
elif decoded_message[0] == 'Name':
message = await generate_message(user_name=decoded_message[1][1])
await matcher.finish(message)
@driver.on_startup
async def _():
await Request.init_cache()
await Request.read_cache()
@driver.on_shutdown
async def _():
await Request.close_browser()
async def get_user_data(
user_name: str = None,
user_id: str = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户数据'''
if user_name is not None and user_id is None:
user_data_url = f'https://ch.tetr.io/api/users/{user_name}'
elif user_name is None and user_id is not None:
user_data_url = f'https://ch.tetr.io/api/users/{user_id}'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await Request.request(user_data_url)
async def get_solo_data(
user_name: str = None,
user_id: str = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取Solo数据'''
if user_name is not None and user_id is None:
user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records'
elif user_name is None and user_id is not None:
user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await Request.request(user_solo_url)
async def get_user_id(user_data: dict) -> str:
'''获取用户ID'''
return user_data['data']['user']['_id']
async def check_user_id(user_id: str) -> tuple[bool, str]:
'''检查用户ID是否有效'''
user_data = await get_user_data(user_id=user_id)
if user_data[0] is False:
return False, '用户信息请求失败'
if user_data[1] is False:
return False, f'用户信息请求错误:\n{user_data[2]["error"]}'
if user_id == user_data[2]['data']['user']['_id']:
return True, ''
raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下x')
async def get_league_stats(user_data: dict) -> dict[str, Any]:
'''获取排位统计数据'''
league = user_data['data']['user']['league']
league_stats: dict[str, Any] = {}
if league['gamesplayed'] != 0:
league_stats['PPS'] = league['pps']
league_stats['APM'] = league['apm']
league_stats['VS'] = 0 if league['vs'] is None else league['vs']
league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper()
if league['rating'] == -1:
league_stats['Rank'] = None
else:
league_stats['Rating'] = round(league['rating'], 2)
league_stats['Glicko'] = round(league['glicko'], 2)
league_stats['RD'] = round(league['rd'], 2)
league_stats['Standing'] = league['standing']
league_stats['LPM'] = round((league['pps'] * 24), 2)
league_stats['APL'] = round(
(league_stats['APM'] / league_stats['LPM']), 2)
league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2)
league_stats['ADPL'] = round(
(league_stats['ADPM'] / league_stats['LPM']), 2)
return league_stats
async def get_sprint_stats(solo_data: dict) -> dict[str, Any]:
'''获取40L统计数据'''
sprint_stats = {}
solo = solo_data['data']['records']['40l']
if solo['record'] is not None:
sprint_stats['Time'] = round(
solo['record']['endcontext']['finalTime'] / 1000, 2)
if solo['rank'] is not None:
sprint_stats['Rank'] = solo['rank']
return sprint_stats
async def get_blitz_stats(solo_data: dict) -> dict[str, Any]:
'''获取Blitz统计数据'''
blitz_stats = {}
blitz = solo_data['data']['records']['blitz']
if blitz['record'] is not None:
blitz_stats['Score'] = blitz['record']['endcontext']['score']
if blitz['rank'] is not None:
blitz_stats['Rank'] = blitz['rank']
return blitz_stats
async def generate_message(user_name: str = None, user_id: str = None) -> str:
'''生成消息'''
user_data, solo_data = await gather(
get_user_data(user_name=user_name, user_id=user_id),
get_solo_data(user_name=user_name, user_id=user_id)
)
if user_data[0] is False:
return '用户信息请求失败'
if user_data[1] is False:
return f'用户信息请求错误:\n{user_data[2]["error"]}'
user_name = user_data[2]['data']['user']['username'].upper()
league_stats = await get_league_stats(user_data[2])
message = ''
if not league_stats:
message += f'用户 {user_name} 没有排位统计数据'
else:
if league_stats['Rank'] is None:
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
else:
if league_stats['Rank'] == 'Z':
message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR'
else:
message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})'
message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:'
message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )'
message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )'
if league_stats["VS"] != 0:
message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )'
if solo_data[0] is False:
return f'{message}\nSolo统计数据请求失败'
if solo_data[1] is False:
return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}'
sprint_stats, blitz_stats = await gather(
get_sprint_stats(solo_data[2]),
get_blitz_stats(solo_data[2])
)
message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else ''
message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''
message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else ''
message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else ''
return message
class Request:
'''网络请求相关类'''
_browser: Browser | None = None
_headers: dict | None = None
_cookies: dict | None = None
@classmethod
async def _init_playwright(cls) -> Browser:
'''初始化playwright'''
playwright = await async_playwright().start()
cls._browser = await playwright.firefox.launch()
return cls._browser
@classmethod
async def _get_browser(cls) -> Browser:
'''获取浏览器对象'''
return cls._browser or await cls._init_playwright()
@classmethod
async def _anti_cloudflare(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
'''用firefox硬穿五秒盾'''
browser = await cls._get_browser()
context = await browser.new_context()
page = await context.new_page()
response = await page.goto(url)
attempts = 0
while attempts < 60:
attempts += 1
text = await page.locator("body").text_content()
if text is None:
await page.wait_for_timeout(1000)
continue
if await page.title() == 'Please Wait... | Cloudflare':
# TODO 有无人来做一个过验证码(
break
try:
data = loads(text)
except JSONDecodeError:
await page.wait_for_timeout(1000)
else:
assert isinstance(response, Response)
cls._headers = await response.request.all_headers()
cls._cookies = {i['name']: i['value'] for i in await context.cookies()}
await cls._write_cache()
await page.close()
await context.close()
return True, data['success'], data
await page.close()
await context.close()
return True, False, {'error': '绕过五秒盾失败'}
@classmethod
async def init_cache(cls) -> None:
'''初始化缓存文件'''
if not os.path.exists(os.path.dirname(config.cache_path)):
os.makedirs(os.path.dirname(config.cache_path))
if not os.path.exists(config.cache_path):
with open(file=config.cache_path, mode='w', encoding='UTF-8') as file:
file.write(
dumps(
{
'headers': cls._headers,
'cookies': cls._cookies
}
)
)
@classmethod
async def read_cache(cls) -> None:
'''读取缓存文件'''
try:
with open(file=config.cache_path, mode='r', encoding='UTF-8') as file:
json = loads(file.read())
cls._headers = json['headers']
cls._cookies = json['cookies']
except FileNotFoundError:
await cls.init_cache()
except PermissionError:
os.remove(config.cache_path)
await cls.init_cache()
except JSONDecodeError:
os.remove(config.cache_path)
await cls.init_cache()
@classmethod
async def _write_cache(cls) -> None:
'''写入缓存文件'''
try:
with open(file=config.cache_path, mode='r+', encoding='UTF-8') as file:
file.write(
dumps(
{
'headers': cls._headers,
'cookies': cls._cookies
}
)
)
except FileNotFoundError:
await cls.init_cache()
except PermissionError:
os.remove(config.cache_path)
await cls.init_cache()
except JSONDecodeError:
os.remove(config.cache_path)
await cls.init_cache()
@classmethod
async def request(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
'''请求api'''
try:
async with aiohttp.ClientSession(cookies=cls._cookies) as session:
async with session.get(url, headers=cls._headers) as resp:
data = await resp.json()
return True, data['success'], data
except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(f'请求错误\n{error}')
return False, False, {}
except aiohttp.client_exceptions.ContentTypeError:
return await cls._anti_cloudflare(url)
@classmethod
async def close_browser(cls) -> None:
'''关闭浏览器对象'''
if isinstance(cls._browser, Browser):
await cls._browser.close()

View File

@@ -0,0 +1,194 @@
from asyncio import gather
from re import I
from typing import Any
import aiohttp
from lxml import etree
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.log import logger
from nonebot.matcher import Matcher
from pandas import read_html
from ..utils.database import DataBase
from ..utils.message_analyzer import (
handle_bind_message,
handle_stats_query_message
)
TOPBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
TopStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
@TOPBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_bind_message(message=event.raw_message, game_type='TOP')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'Name':
user_data = await get_user_data(decoded_message[1][1])
if user_data[0] is False:
await matcher.finish('用户信息请求失败')
else:
if await check_user(user_data[1]) is False:
await matcher.finish('用户不存在')
user_name = await get_user_name(user_data[1])
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(
await DataBase.write_bind_info(
qq_number=event.sender.user_id,
user=user_name,
game_type='TOP'
)
)
@TopStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='TOP')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT':
if event.is_tome() is True:
await matcher.finish('不能查询bot的信息')
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
elif decoded_message[0] == 'ME':
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败, 请联系bot主人')
bind_info = await DataBase.query_bind_info(qq_number=event.sender.user_id, game_type='TOP')
if bind_info is None:
message = '未查询到绑定信息'
else:
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
elif decoded_message[0] == 'Name':
message = await generate_message(decoded_message[1][1])
await matcher.finish(message)
async def get_user_data(user_name: str) -> tuple[bool, str]:
'''获取用户信息'''
url = f'http://tetrisonline.pl/top/profile.php?user={user_name}'
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return True, await resp.text()
except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(error)
return False, ''
async def check_user(user_data: str) -> bool:
'''如果用户存在返回True, 如果用户不存在返回False'''
return user_data.find('user not found!') == -1
async def get_user_name(user_data: str) -> str:
'''获取用户名'''
data = etree.HTML(user_data).xpath('//div[@class="mycontent"]/h1/text()')
if isinstance(data, list):
return str(data[0]).replace('\'s profile', '')
raise TypeError('预期外行为, 请上报GitHub')
async def get_game_stats(user_data: str) -> dict[str, dict[str, Any]]:
'''获取游戏统计数据'''
game_stats: dict[str, Any] = {'24H': {}, 'All': {}}
html = etree.HTML(user_data)
data = html.xpath('//div[@class="mycontent"]//text()')
if isinstance(data, list):
for i in data:
if not isinstance(i, str):
i = str(i)
i = i.strip()
if i.startswith('lpm:'):
game_stats['24H']['LPM'] = i.replace('lpm:', '').strip()
if i.startswith('apm:'):
game_stats['24H']['APM'] = i.replace('apm:', '').strip()
if 'LPM' in game_stats['24H'] and 'APM' in game_stats['24H']:
break
else:
raise TypeError('预期外行为, 请上报GitHub')
# 如果没有24H统计数据
if game_stats['24H'].get('LPM') in [None, ''] or game_stats['24H'].get('APM') in [None, '']:
game_stats['24H'].pop('LPM', None)
game_stats['24H'].pop('APM', None)
else:
game_stats['24H']['PPS'] = round(
float(game_stats['24H']['LPM']) / 24, 2)
game_stats['24H']['APL'] = round(
float(game_stats['24H']['APM']) / float(game_stats['24H']['LPM']), 2)
game_stats['24H']['LPM'] = round(float(game_stats['24H']['LPM']), 2)
game_stats['24H']['APM'] = round(float(game_stats['24H']['APM']), 2)
table = html.xpath('//table')
if isinstance(table, list):
if isinstance(table[0], etree._Element):
stats_table = etree.tostring(table[0], encoding='utf-8').decode()
df = read_html(stats_table, encoding='utf-8', header=0)[0]
results = df.T.to_dict().values()
if results:
game_stats['All']['LPM'] = 0
game_stats['All']['APM'] = 0
for i in results:
if isinstance(i, dict):
game_stats['All']['LPM'] += i['lpm']
game_stats['All']['APM'] += i['apm']
game_stats['All']['LPM'] = game_stats['All']['LPM'] / \
len(results)
game_stats['All']['APM'] = game_stats['All']['APM'] / \
len(results)
game_stats['All']['PPS'] = round(
game_stats['All']['LPM'] / 24, 2)
game_stats['All']['APL'] = round(
float(game_stats['All']['APM']) / float(game_stats['All']['LPM']), 2)
game_stats['All']['LPM'] = round(
float(game_stats['All']['LPM']), 2)
game_stats['All']['APM'] = round(
float(game_stats['All']['APM']), 2)
else:
raise TypeError('预期外行为, 请上报GitHub')
return game_stats
async def generate_message(user_name: str) -> str:
'''生成消息'''
user_data = await get_user_data(user_name)
if user_data[0] is False:
return '用户信息请求失败'
if await check_user(user_data[1]) is False:
return '用户不存在'
user_name, game_stats = await gather(
get_user_name(user_data[1]),
get_game_stats(user_data[1])
)
message = ''
if game_stats['24H'] and game_stats['All']:
message += f'用户 {user_name} 24小时内统计数据为: '
message += f'\nL\'PM: {game_stats["24H"]["LPM"]} ( {game_stats["24H"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["24H"]["APM"]} ( x{game_stats["24H"]["APL"]} )'
message += '\n历史统计数据为: '
message += f'\nL\'PM: {game_stats["All"]["LPM"]} ( {game_stats["All"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["All"]["APM"]} ( x{game_stats["All"]["APL"]} )'
elif game_stats['24H'] and not game_stats['All']:
message += f'用户 {user_name} 24小时内统计数据为: '
message += f'\nL\'PM: {game_stats["24H"]["LPM"]} ( {game_stats["24H"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["24H"]["APM"]} ( x{game_stats["24H"]["APL"]} )'
message += '\n暂无历史统计数据'
message += '\n( 这理论上不该存在, 如果你看到了, 请联系bot主人查看后台'
logger.error(f'老实说这个不算Error, 但是理论上不应该有, 如果你看到了这条日志, 我希望你能来Github发个issue\
user_name: {user_name}\
user_data: {user_data}\
game_stats: {game_stats}')
elif not game_stats['24H'] and game_stats['All']:
message += f'用户 {user_name} 暂无24小时内统计数据, 历史统计数据为: '
message += f'\nL\'PM: {game_stats["All"]["LPM"]} ( {game_stats["All"]["PPS"]} pps )'
message += f'\nAPM: {game_stats["All"]["APM"]} ( x{game_stats["All"]["APL"]} )'
else:
message += f'用户 {user_name} 暂无24小时内统计数据, 暂无历史统计数据'
return message

View File

@@ -0,0 +1,174 @@
from asyncio import gather
from re import I
from typing import Any
import aiohttp
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.log import logger
from nonebot.matcher import Matcher
from ..utils.message_analyzer import handle_stats_query_message
TOSStats = on_regex(
pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
flags=I,
permission=GROUP
)
@TOSStats.handle()
async def _(event: MessageEvent, matcher: Matcher):
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='TOS')
if decoded_message[0] is None:
await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT' or decoded_message[0] == 'QQ':
if decoded_message[1][1] == event.self_id:
await matcher.finish('不能查询bot的信息')
message = await generate_message(tea_id=decoded_message[1][1])
elif decoded_message[0] == 'ME':
message = await generate_message(tea_id=event.sender.user_id)
elif decoded_message[0] == 'Name':
message = await generate_message(user_name=decoded_message[1][1])
await matcher.finish(message)
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
'''请求api'''
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
data = await resp.json()
return True, data['success'], data
except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(f'请求错误\n{error}')
return False, False, {}
async def get_user_info(
user_name: str = None,
tea_id: int = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户信息'''
if user_name is not None and tea_id is None:
user_data_url = f'https://teatube.cn:8888/getUsernameInfo?username={user_name}'
elif user_name is None and tea_id is not None:
user_data_url = f'https://teatube.cn:8888/getTeaIdInfo?teaId={tea_id}'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await request(user_data_url)
async def get_user_data(
user_name: str = None,
tea_id: int = None,
other_parameter: str = ''
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户数据'''
if user_name is not None and tea_id is None:
user_data_url = f'https://teatube.cn:8888/getProfile?id={user_name}{other_parameter}'
elif user_name is None and tea_id is not None:
user_data_url = f'https://teatube.cn:8888/getProfile?id={tea_id}{other_parameter}'
else:
raise ValueError('预期外行为, 请上报GitHub')
return await request(user_data_url)
async def get_rank_stats(user_info: dict) -> dict[str, float]:
'''获取Rank数据'''
data = user_info['data']
if int(data['rankedGames']) != 0:
rank_stats = {}
rank_stats['Rating'] = round(float(data['ratingNow']), 2)
rank_stats['RD'] = round(float(data['rdNow']), 2)
rank_stats['Vol'] = round(float(data['volNow']), 3)
return rank_stats
async def get_game_data(user_data: dict) -> dict[str, int | float]:
'''获取游戏数据'''
if user_data['data'] != []:
game_data: dict[str, int | float] = {}
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = num = 0
for i in user_data['data']:
# 排除单人局和时间为0的游戏
if i['num_players'] == 1 or i['time'] == 0:
continue
# 茶:不计算没挖掘的局, 即使apm和lpm也如此
if i['dig'] is None:
continue
# 加权计算
time = i['time'] / 1000
lpm = 24 * (i['pieces'] / time)
apm = (i['attack'] / time) * 60
adpm = ((i['attack'] + i['dig']) / time) * 60
weighted_total_lpm += lpm * time
weighted_total_apm += apm * time
weighted_total_adpm += adpm * time
total_time += time
num += 1
if num == 50:
break
if num > 0:
game_data['NUM'] = num
game_data['LPM'] = round((weighted_total_lpm / total_time), 2)
game_data['APM'] = round((weighted_total_apm / total_time), 2)
game_data['ADPM'] = round((weighted_total_adpm / total_time), 2)
game_data['PPS'] = round((game_data['LPM'] / 24), 2)
game_data['APL'] = round((game_data['APM'] / game_data['LPM']), 2)
game_data['ADPL'] = round(
(game_data['ADPM'] / game_data['LPM']), 2)
game_data['VS'] = round((game_data['ADPM'] / 60 * 100), 2)
# TODO: 如果有效局数不满50, 没有无dig信息的局, 且userData['data']内有50个局, 则继续往前获取信息
return game_data
async def get_pb_data(user_info: dict) -> dict[str, float | str]:
'''获取PB数据'''
pb_data: dict[str, float | str] = {}
data = user_info['data']
if int(data['PBSprint']) != 2147483647:
pb_data['Sprint'] = round(float(data['PBSprint']) / 1000, 2)
if int(data['PBMarathon']) != 0:
pb_data['Marathon'] = data['PBMarathon']
if int(data['PBChallenge']) != 0:
pb_data['Challenge'] = data['PBChallenge']
return pb_data
async def generate_message(user_name: str = None, tea_id: int = None) -> str:
'''生成消息'''
user_info, user_data = await gather(
get_user_info(user_name=user_name, tea_id=tea_id),
get_user_data(user_name=user_name, tea_id=tea_id)
)
if user_info[0] is False:
return '用户信息请求失败'
if user_info[1] is False:
return f'用户信息请求错误:\n{user_info[2]["error"]}'
rank_stats, pb_data = await gather(
get_rank_stats(user_info[2]),
get_pb_data(user_info[2])
)
message = f'用户 {user_info[2]["data"]["name"]} ({user_info[2]["data"]["teaId"]}) '
if not rank_stats:
message += '暂无段位统计数据'
else:
message += f', 段位分 {rank_stats["Rating"]}±{rank_stats["RD"]} ({rank_stats["Vol"]}) '
if user_data[0] is False:
message = f'{message.rstrip()}\n游戏数据请求失败'
elif user_data[1] is False:
message = f'{message.rstrip()}\n游戏数据请求错误:\n{user_data[2]["error"]}'
else:
game_data = await get_game_data(user_data[2])
if not game_data:
message += ', 暂无游戏数据'
else:
message += f', 最近 {game_data["NUM"]} 局数据'
message += f'\nL\'PM: {game_data["LPM"]} ( {game_data["PPS"]} pps )'
message += f'\nAPM{game_data["APM"]} ( x{game_data["APL"]} )'
message += f'\nADPM{game_data["ADPM"]} ( x{game_data["ADPL"]} ) ( {game_data["VS"]}vs )'
message += f'\n40L: {pb_data["Sprint"]}s' if 'Sprint' in pb_data else ''
message += f'\nMarathon: {pb_data["Marathon"]}' if 'Marathon' in pb_data else ''
message += f'\nChallenge: {pb_data["Challenge"]}' if 'Challenge' in pb_data else ''
return message

View File

@@ -0,0 +1 @@
from . import config, database, message_analyzer

View File

@@ -0,0 +1,7 @@
from pydantic import BaseModel
class Config(BaseModel):
'''配置类'''
cache_path: str = 'cache/nonebot_plugin_tetris_stats/cache'
db_path: str = 'data/nonebot_plugin_tetris_stats/data.db'

View File

@@ -0,0 +1,86 @@
import os
from asyncio import gather
from sqlite3 import Connection, connect
from nonebot import get_driver
from nonebot.log import logger
from .config import Config
driver = get_driver()
config = Config.parse_obj(get_driver().config)
@driver.on_startup
async def _():
await DataBase.init_db()
@driver.on_shutdown
async def _():
await DataBase.close_db()
class DataBase():
'''数据库交互类'''
_db: Connection | None = None
@classmethod
async def init_db(cls) -> Connection:
'''初始化数据库'''
if not os.path.exists(os.path.dirname(config.db_path)):
os.makedirs(os.path.dirname(config.db_path))
cls._db = connect(config.db_path)
cursor = cls._db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cls._db.commit()
logger.info('数据库初始化完成')
return cls._db
@classmethod
async def _get_db(cls) -> Connection:
'''获取数据库对象'''
return cls._db or await cls.init_db()
@classmethod
async def query_bind_info(cls, qq_number: str | int, game_type: str) -> str | None:
'''查询绑定信息'''
db = await cls._get_db()
cursor = db.cursor()
cursor.execute(
f'SELECT USER FROM {game_type}BIND WHERE QQ = {qq_number}')
user = cursor.fetchone()
if user is None:
return None
return user[0]
@classmethod
async def write_bind_info(cls, qq_number: str | int, user: str, game_type: str) -> str:
'''写入绑定信息'''
bind_info, db = await gather(
cls.query_bind_info(qq_number=qq_number, game_type=game_type),
cls._get_db()
)
cursor = db.cursor()
if bind_info is not None:
cursor.execute(
f'UPDATE {game_type}BIND SET USER = ? WHERE QQ = ?', (user, qq_number))
message = '更新成功'
elif bind_info is None:
cursor.execute(
f'INSERT INTO {game_type}BIND (QQ, USER) VALUES (?, ?)', (qq_number, user))
message = '绑定成功'
db.commit()
return message
@classmethod
async def close_db(cls) -> None:
'''关闭数据库对象'''
if isinstance(cls._db, Connection):
cls._db.close()

View File

@@ -0,0 +1,78 @@
from re import match, sub
async def handle_bind_message(message: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_cmd_aliases = {'IO': ['io绑定', 'iobind'],
'TOP': ['top绑定', 'topbind']}
# 剔除命令前缀
for i in _cmd_aliases[game_type]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
else:
raise ValueError('预期外行为, 请上报GitHub')
if message == '' or message.isspace():
return None, ('用户名为空', None)
return await check_name(message, game_type)
async def handle_stats_query_message(message: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
_cmd_aliases = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']}
_me = [
'', '自己', '我等', '卑人', '', '老身', '', '老娘', '本姑娘', '本大爷', '鄙人', '寡人',
'小生', '贫僧', '本人', '', '', '', '', '', 'me', '洒家', '在下', '', '人家',
'本小姐', '老夫', '老子', '', '本尊', '', '拙者', '', '', '自分', '吾輩', '我輩', '',
'己等', '俺等', '此方', '', '', '劳资', '本宝宝', '', '本喵', 'watashi', 'i', 'myself',
'self', 'oneself'
]
# 剔除命令前缀
for i in _cmd_aliases[game_type]:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
if message == '' or message.isspace():
return None, ('用户名为空', None)
if message.startswith('[CQ:at,qq='):
try:
user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0])
except ValueError:
return None, ('QQ号码不合法', None)
else:
return 'AT', (None, user)
elif message in _me:
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
return 'ME', (None, None)
else:
return await check_name(message, game_type)
async def check_name(name: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
if game_type == 'IO':
if match(r'^[a-f0-9]{24}$', name):
return 'ID', (None, name)
if match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return 'Name', (None, name.lower())
return None, ('用户名不合法', None)
if game_type == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return 'Name', (None, name)
return None, ('用户名不合法', None)
if game_type == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False
and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长
# TODO 简化正则表达式
return 'Name', (None, name)
if name.isdigit() is True:
return 'QQ', (None, name)
return None, ('用户名不合法', None)
return None, ('游戏类型错误', None)

1312
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "nonebot-plugin-tetris-stats"
version = "0.2.0"
version = "0.3.2"
description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件"
authors = ["scdhh <wallfjjd@gmail.com>"]
readme = "README.md"
@@ -9,15 +9,24 @@ repository = "https://github.com/shoucandanghehe/nonebot-plugin-tetris-stats"
license = "MIT"
[tool.poetry.dependencies]
python = "^3.10"
python = "^3.10,<3.11"
nonebot-adapter-onebot = "^2.0.0-beta.1"
aiohttp = "^3.8.1"
asyncio = "^3.4.3"
nonebot2 = "^2.0.0-beta.3"
lxml = "^4.9.1"
pandas = "^1.4.3"
playwright = "^1.24.1"
ujson = "^5.4.0"
Brotli = "^1.0.9"
[tool.poetry.dev-dependencies]
mypy = "^0.950"
autopep8 = "^1.6.0"
mypy = "^0.971"
autopep8 = "^1.7.0"
pylint = "^2.14.5"
types-ujson = "^5.4.0"
lxml-stubs = "^0.4.0"
pandas-stubs = "^1.4.3"
[build-system]
requires = ["poetry-core>=1.0.0"]