mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
Compare commits
103 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f600060b9e | ||
|
|
a808bfb7f8 | ||
| e5354d39c9 | |||
| 9d37d05b24 | |||
| d7b6e3cb17 | |||
|
|
32d34c93d7 | ||
| 972f7e90d2 | |||
|
|
fe412d5acd | ||
|
|
ac8d332e69 | ||
| 2015917ca0 | |||
|
|
264c5c04e9 | ||
| 14d646a2d0 | |||
|
|
d2a33ec5f8 | ||
|
|
c4c0faf64b | ||
|
|
a8e769866f | ||
|
|
38595d46b9 | ||
|
|
bd08166f45 | ||
|
|
5b1e2a1e28 | ||
|
|
903fe34302 | ||
|
|
0ea597809f | ||
|
|
58b4c8ab9a | ||
|
|
e82a9d4747 | ||
|
|
6125190fe0 | ||
|
|
402c30b32a | ||
|
|
4d51afa726 | ||
|
|
4af6170470 | ||
|
|
98244eed60 | ||
|
|
2e222ed6f5 | ||
|
|
c224a0f880 | ||
|
|
7df6cb3396 | ||
|
|
ee9da6a5e8 | ||
|
|
cc3a53aa3b | ||
|
|
ea767291a2 | ||
|
|
cffe472ad3 | ||
|
|
6e6b16c6f3 | ||
|
|
bdb0139cf1 | ||
|
|
ad9c7b24a6 | ||
|
|
6285900e16 | ||
|
|
a1d882c122 | ||
|
|
f6d5be7cc0 | ||
| 09e18c526a | |||
| af8fa05394 | |||
|
|
e099beeb6b | ||
|
|
5f8f8b67b2 | ||
|
|
a0b6c85637 | ||
|
|
163f0e139a | ||
|
|
9a02eb5cce | ||
|
|
2f9849c74e | ||
|
|
650dfb0669 | ||
|
|
a0e0ff2fa3 | ||
|
|
f5188594f1 | ||
|
|
26374a96fb | ||
|
|
67ccf465ce | ||
|
|
6bcde5cf74 | ||
|
|
1fffa560fb | ||
|
|
d995bfe74f | ||
|
|
9437d99ec4 | ||
|
|
4aed7862a3 | ||
|
|
8ee6648d2a | ||
|
|
1e0ca7149d | ||
|
|
dafb4352fa | ||
|
|
ae74cdbc0c | ||
|
|
364717f049 | ||
|
|
a05260ba4a | ||
|
|
a80d9586ff | ||
| d0a0baa275 | |||
|
|
625fc895ea | ||
|
|
3b12d74193 | ||
|
|
359f3964fb | ||
|
|
e655737935 | ||
|
|
98cdf6a529 | ||
|
|
53379eb951 | ||
|
|
4567764079 | ||
|
|
60d6e487d2 | ||
|
|
09c57f52ea | ||
|
|
bab4b06de0 | ||
|
|
175daafe5d | ||
|
|
a2d8bc55cc | ||
|
|
d1a2a20e13 | ||
|
|
e32a32dee0 | ||
|
|
540b01649d | ||
|
|
f224af7aa4 | ||
|
|
c5db3fef3f | ||
|
|
884cc08d77 | ||
|
|
b30d5e008d | ||
|
|
0a3e265272 | ||
|
|
b32c2f1895 | ||
| d62c53031e | |||
| 61b671354b | |||
| 3f24dc2f1d | |||
|
|
c63bb52540 | ||
|
|
a0bc307aa6 | ||
| d3258c2bb7 | |||
| b320a23c77 | |||
| 69e4cb97e3 | |||
| c6f83230c8 | |||
| e473dca4df | |||
|
|
68028cf3d9 | ||
|
|
10929bab03 | ||
|
|
8e5e7d3c33 | ||
| ec04af21d6 | |||
| 2aef281e8c | |||
| 2be62140ac |
12
.github/dependabot.yml
vendored
Normal file
12
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "pip" # See documentation for possible values
|
||||
directory: "/" # Location of package manifests
|
||||
target-branch: "dev"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
25
.github/workflows/Release.yml
vendored
Normal file
25
.github/workflows/Release.yml
vendored
Normal file
@@ -0,0 +1,25 @@
|
||||
name: Release CI
|
||||
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- "*"
|
||||
|
||||
jobs:
|
||||
release:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-python@v4
|
||||
with:
|
||||
python-version: '3.10'
|
||||
- name: Install Poetry
|
||||
shell: bash
|
||||
run: |
|
||||
pip install poetry
|
||||
- name: Build
|
||||
shell: bash
|
||||
run: |
|
||||
poetry install
|
||||
poetry env use python
|
||||
poetry publish --build -u ${{ secrets.USERNAME }} -p ${{ secrets.PASSWORD }} -n
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -1,4 +1,6 @@
|
||||
.idea
|
||||
dist
|
||||
test*
|
||||
Untitled*
|
||||
*copy*
|
||||
.vscode
|
||||
|
||||
@@ -2,13 +2,13 @@
|
||||
# A comma-separated list of package or module names from where C extensions may
|
||||
# be loaded. Extensions are loading into the active Python interpreter and may
|
||||
# run arbitrary code.
|
||||
extension-pkg-allow-list=lxml
|
||||
extension-pkg-allow-list=lxml, pydantic
|
||||
|
||||
# A comma-separated list of package or module names from where C extensions may
|
||||
# be loaded. Extensions are loading into the active Python interpreter and may
|
||||
# run arbitrary code. (This is an alternative name to extension-pkg-allow-list
|
||||
# for backward compatibility.)
|
||||
extension-pkg-whitelist=lxml
|
||||
extension-pkg-whitelist=lxml, pydantic
|
||||
|
||||
# List of module names for which member attributes should not be checked
|
||||
# (useful for modules/projects where namespaces are manipulated during runtime
|
||||
|
||||
@@ -1,262 +0,0 @@
|
||||
from typing import Any
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
from playwright.async_api import Browser, async_playwright
|
||||
from ujson import loads, JSONDecodeError
|
||||
import aiohttp
|
||||
|
||||
from nonebot import on_regex, get_driver
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.log import logger
|
||||
|
||||
from ..utils.message_analyzer import handle_bind_message, handle_stats_query_message
|
||||
from ..utils.sql import query_bind_info, write_bind_info
|
||||
|
||||
_BROWSER: Browser | None = None
|
||||
|
||||
|
||||
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
|
||||
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@IOBind.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_bind_message(message=event.raw_message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
if decoded_message[0] == 'ID':
|
||||
user_id_stats = await check_user_id(user_id=decoded_message[1][1])
|
||||
if user_id_stats[0] is False:
|
||||
await matcher.finish(user_id_stats[1])
|
||||
else:
|
||||
user_id = decoded_message[1][1]
|
||||
elif decoded_message[0] == 'Name':
|
||||
user_data = await get_user_data(user_name=decoded_message[1][1])
|
||||
if user_data[0] is False:
|
||||
await matcher.finish('用户信息请求失败')
|
||||
elif user_data[1] is False:
|
||||
await matcher.finish(f'用户信息请求错误:\n{user_data[2]["error"]}')
|
||||
else:
|
||||
user_id = await get_user_id(user_data=user_data[2])
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(await write_bind_info(qq_number=event.sender.user_id,
|
||||
user=user_id,
|
||||
game_type='IO'))
|
||||
|
||||
|
||||
@IOStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'AT':
|
||||
if event.is_tome() is True:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
|
||||
elif decoded_message[0] == 'ME':
|
||||
if event.sender.user_id is None:
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败, 请联系bot主人')
|
||||
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='IO')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
|
||||
elif decoded_message[0] == 'ID':
|
||||
message = await generate_message(user_id=decoded_message[1][1])
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(user_name=decoded_message[1][1])
|
||||
await matcher.finish(message=message)
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
if isinstance(_BROWSER, Browser):
|
||||
await _BROWSER.close()
|
||||
|
||||
|
||||
async def init_playwright() -> Browser:
|
||||
'''初始化playwright'''
|
||||
global _BROWSER
|
||||
p = await async_playwright().start()
|
||||
_BROWSER = await p.firefox.launch()
|
||||
return _BROWSER
|
||||
|
||||
|
||||
async def get_browser() -> Browser:
|
||||
'''获取浏览器对象'''
|
||||
return _BROWSER or await init_playwright()
|
||||
|
||||
|
||||
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''请求api'''
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as resp:
|
||||
data = await resp.json()
|
||||
return (True, data['success'], data)
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return (False, False, {})
|
||||
except aiohttp.client_exceptions.ContentTypeError:
|
||||
# 如果有五秒盾就用firefox硬穿
|
||||
browser = await get_browser()
|
||||
page = await browser.new_page()
|
||||
await page.goto(url)
|
||||
attempts = 0
|
||||
while True:
|
||||
text = await page.locator("body").text_content()
|
||||
if text is None:
|
||||
continue
|
||||
attempts += 1
|
||||
if await page.title() == 'Please Wait... | Cloudflare':
|
||||
break
|
||||
try:
|
||||
data = loads(text)
|
||||
except JSONDecodeError:
|
||||
await page.wait_for_timeout(1000)
|
||||
else:
|
||||
return (True, data['success'], data)
|
||||
if attempts >= 60:
|
||||
break
|
||||
await page.close()
|
||||
return (True, False, {'error': '绕过五秒盾失败'})
|
||||
|
||||
|
||||
async def get_user_data(user_name: str = None,
|
||||
user_id: str = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_name}'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_id}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_data_url)
|
||||
|
||||
|
||||
async def get_solo_data(user_name: str = None,
|
||||
user_id: str = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取Solo数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_solo_url)
|
||||
|
||||
|
||||
async def get_user_id(user_data: dict) -> str:
|
||||
'''获取用户ID'''
|
||||
return user_data['data']['user']['_id']
|
||||
|
||||
|
||||
async def check_user_id(user_id: str) -> tuple[bool, str]:
|
||||
'''检查用户ID是否有效'''
|
||||
user_data = await get_user_data(user_id=user_id)
|
||||
if user_data[0] is False:
|
||||
return (False, '用户信息请求失败')
|
||||
if user_data[1] is False:
|
||||
return (False, f'用户信息请求错误:\n{user_data[2]["error"]}')
|
||||
if user_id == user_data[2]['data']['user']['_id']:
|
||||
return (True, '')
|
||||
raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x')
|
||||
|
||||
|
||||
async def get_league_stats(user_data: dict) -> dict[str, Any]:
|
||||
'''获取排位统计数据'''
|
||||
league = user_data['data']['user']['league']
|
||||
league_stats: dict[str, Any] = {}
|
||||
if league['gamesplayed'] != 0:
|
||||
league_stats['PPS'] = league['pps']
|
||||
league_stats['APM'] = league['apm']
|
||||
league_stats['VS'] = 0 if league['vs'] is None else league['vs']
|
||||
league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper()
|
||||
if league['rating'] == -1:
|
||||
league_stats['Rank'] = None
|
||||
else:
|
||||
league_stats['Rating'] = round(league['rating'], 2)
|
||||
league_stats['Glicko'] = round(league['glicko'], 2)
|
||||
league_stats['RD'] = round(league['rd'], 2)
|
||||
league_stats['Standing'] = league['standing']
|
||||
league_stats['LPM'] = round((league['pps'] * 24), 2)
|
||||
league_stats['APL'] = round(
|
||||
(league_stats['APM'] / league_stats['LPM']), 2)
|
||||
league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2)
|
||||
league_stats['ADPL'] = round(
|
||||
(league_stats['ADPM'] / league_stats['LPM']), 2)
|
||||
return league_stats
|
||||
|
||||
|
||||
async def get_sprint_stats(solo_data: dict) -> dict[str, Any]:
|
||||
'''获取40L统计数据'''
|
||||
sprint_stats = {}
|
||||
solo = solo_data['data']['records']['40l']
|
||||
if solo['record'] is not None:
|
||||
sprint_stats['Time'] = round(
|
||||
solo['record']['endcontext']['finalTime'] / 1000, 2)
|
||||
if solo['rank'] is not None:
|
||||
sprint_stats['Rank'] = solo['rank']
|
||||
return sprint_stats
|
||||
|
||||
|
||||
async def get_blitz_stats(solo_data: dict) -> dict[str, Any]:
|
||||
'''获取Blitz统计数据'''
|
||||
blitz_stats = {}
|
||||
blitz = solo_data['data']['records']['blitz']
|
||||
if blitz['record'] is not None:
|
||||
blitz_stats['Score'] = blitz['record']['endcontext']['score']
|
||||
if blitz['rank'] is not None:
|
||||
blitz_stats['Rank'] = blitz['rank']
|
||||
return blitz_stats
|
||||
|
||||
|
||||
async def generate_message(user_name: str = None, user_id: str = None) -> str:
|
||||
'''生成消息'''
|
||||
user_data, solo_data = await gather(get_user_data(user_name=user_name, user_id=user_id),
|
||||
get_solo_data(user_name=user_name, user_id=user_id))
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
user_name = user_data[2]['data']['user']['username'].upper()
|
||||
league_stats = await get_league_stats(user_data[2])
|
||||
message = ''
|
||||
if not league_stats:
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
else:
|
||||
if league_stats['Rank'] is None:
|
||||
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
|
||||
else:
|
||||
if league_stats['Rank'] == 'Z':
|
||||
message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR'
|
||||
else:
|
||||
message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})'
|
||||
message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:'
|
||||
message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )'
|
||||
message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )'
|
||||
if league_stats["VS"] != 0:
|
||||
message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )'
|
||||
if solo_data[0] is False:
|
||||
return f'{message}\nSolo统计数据请求失败'
|
||||
if solo_data[1] is False:
|
||||
return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}'
|
||||
sprint_stats, blitz_stats = await gather(get_sprint_stats(solo_data[2]),
|
||||
get_blitz_stats(solo_data[2]))
|
||||
message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else ''
|
||||
message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''
|
||||
message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else ''
|
||||
message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else ''
|
||||
return message
|
||||
@@ -0,0 +1,40 @@
|
||||
from re import I
|
||||
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
|
||||
from .processor import Processor
|
||||
|
||||
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
|
||||
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
|
||||
IORank = on_regex(pattern=r'^io段位|^iorank', flags=I, permission=GROUP)
|
||||
|
||||
@IOBind.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
await matcher.finish(
|
||||
await Processor.handle_bind(
|
||||
message=event.raw_message,
|
||||
qq_number=event.sender.user_id
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@IOStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
if event.is_tome():
|
||||
await matcher.finish('不能查询bot的信息')
|
||||
await matcher.finish(
|
||||
await Processor.handle_query(
|
||||
message=event.raw_message,
|
||||
qq_number=event.sender.user_id
|
||||
)
|
||||
)
|
||||
|
||||
@IORank.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
await matcher.finish(
|
||||
await Processor.handle_rank(
|
||||
message=event.raw_message
|
||||
)
|
||||
)
|
||||
@@ -0,0 +1,289 @@
|
||||
import math
|
||||
from asyncio import gather
|
||||
from typing import Any
|
||||
|
||||
from nonebot.log import logger
|
||||
|
||||
from ...utils.database import DataBase
|
||||
from ...utils.message_analyzer import (
|
||||
handle_bind_message,
|
||||
handle_rank_message,
|
||||
handle_stats_query_message,
|
||||
)
|
||||
from .request import Request
|
||||
|
||||
|
||||
class Processor:
|
||||
@classmethod
|
||||
async def handle_bind(cls, message: str, qq_number: int | None) -> str:
|
||||
'''处理绑定消息'''
|
||||
decoded_message = await handle_bind_message(message=message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
return decoded_message[1][0]
|
||||
if decoded_message[0] == 'ID':
|
||||
user_id_stats = await cls.check_user_id(decoded_message[1][1])
|
||||
if user_id_stats[0] is False:
|
||||
return user_id_stats[1]
|
||||
user_id = decoded_message[1][1]
|
||||
elif decoded_message[0] == 'Name':
|
||||
user_data = await cls.get_user_data(user_name=decoded_message[1][1])
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
user_id = await cls.get_user_id(user_data[2])
|
||||
if qq_number is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
return '获取QQ号失败'
|
||||
return (
|
||||
await DataBase.write_bind_info(
|
||||
qq_number=qq_number,
|
||||
user=user_id,
|
||||
game_type='IO'
|
||||
)
|
||||
)
|
||||
logger.error('预期外行为, 请上报GitHub')
|
||||
return '出现预期外行为,请查看后台信息'
|
||||
|
||||
@classmethod
|
||||
async def handle_rank(cls, message: str):
|
||||
query_rank = await handle_rank_message(message)
|
||||
rank_info = await DataBase.query_rank_info_today(rank=query_rank.lower())
|
||||
|
||||
if rank_info is None:
|
||||
ranks_percentiles = {
|
||||
'x': 1,
|
||||
'u': 5,
|
||||
'ss': 11,
|
||||
's+': 17,
|
||||
's': 23,
|
||||
's-': 30,
|
||||
'a+': 38,
|
||||
'a': 46,
|
||||
'a-': 54,
|
||||
'b+': 62,
|
||||
'b': 70,
|
||||
'b-': 78,
|
||||
'c+': 84,
|
||||
'c': 90,
|
||||
'c-': 95,
|
||||
'd+': 97.5,
|
||||
'd': 100,
|
||||
}
|
||||
|
||||
if query_rank.lower() not in (i for i in ranks_percentiles.keys()):
|
||||
return '未知段位'
|
||||
|
||||
result = await Request.request('https://ch.tetr.io/api/users/lists/league/all')
|
||||
users: list = result[2]['data']['users']
|
||||
|
||||
def avg(rank_users: list, column: str, playercount: int | None = None) -> float:
|
||||
return sum(i['league'][column] for i in rank_users) / (playercount or len(rank_users))
|
||||
|
||||
for rank, percentile in ranks_percentiles.items():
|
||||
offset = math.floor((percentile / 100) * len(users)) - 1
|
||||
tr = users[offset]['league']['rating']
|
||||
|
||||
rank_users = list(filter(lambda x: x['league']['rank'] == rank, users))
|
||||
playercount = len(rank_users)
|
||||
|
||||
avg_apm = avg(rank_users, 'apm', playercount)
|
||||
avg_pps = avg(rank_users, 'pps', playercount)
|
||||
avg_vs = avg(rank_users, 'vs', playercount)
|
||||
|
||||
await DataBase.write_rank_info_today(
|
||||
rank=rank,
|
||||
trline=tr,
|
||||
playercount=playercount,
|
||||
avgapm=avg_apm,
|
||||
avgpps=avg_pps,
|
||||
avgvs=avg_vs
|
||||
)
|
||||
|
||||
return await Processor.handle_rank(message=message)
|
||||
else:
|
||||
avg_apm = round(rank_info[3], 2)
|
||||
avg_pps = round(rank_info[4], 2)
|
||||
avg_vs = round(rank_info[5], 2)
|
||||
avg_lpm = round((avg_pps * 24), 2)
|
||||
avg_apl = round((avg_apm / avg_lpm), 2)
|
||||
avg_adpm = round((avg_vs * 0.6), 2)
|
||||
avg_adpl = round((avg_adpm / avg_lpm), 2)
|
||||
|
||||
message = f'{query_rank.upper()} 段, 分数线 {round(rank_info[1], 2)} TR, {rank_info[2]} 名玩家'
|
||||
message += f'\n对比昨日趋势: {rank_info[0]}'
|
||||
message += '\n平均数据: '
|
||||
message += f"\nL'PM: {avg_lpm} ( {avg_pps} pps )"
|
||||
message += f'\nAPM: {avg_apm} ( x{avg_apl} )'
|
||||
message += f'\nADPM: {avg_adpm} ( x{avg_adpl} ) ( {avg_vs}vs )'
|
||||
message += '\n'
|
||||
message += f'\n数据更新时间: {rank_info[6]}'
|
||||
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
async def handle_query(cls, message: str, qq_number: int | None):
|
||||
'''处理查询消息'''
|
||||
decoded_message = await handle_stats_query_message(message=message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
return decoded_message[1][0]
|
||||
if decoded_message[0] == 'AT': # 在入口处判断是否@bot本身
|
||||
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
|
||||
if bind_info is None:
|
||||
return '未查询到绑定信息'
|
||||
return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}'
|
||||
if decoded_message[0] == 'ME':
|
||||
if qq_number is None:
|
||||
logger.error('获取QQ号失败')
|
||||
return '获取QQ号失败, 请联系bot主人'
|
||||
bind_info = await DataBase.query_bind_info(qq_number=qq_number, game_type='IO')
|
||||
if bind_info is None:
|
||||
return '未查询到绑定信息'
|
||||
return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}'
|
||||
if decoded_message[0] == 'ID':
|
||||
return await Processor.generate_message(user_id=decoded_message[1][1])
|
||||
if decoded_message[0] == 'Name':
|
||||
return await Processor.generate_message(user_name=decoded_message[1][1])
|
||||
|
||||
@classmethod
|
||||
async def get_user_data(
|
||||
cls,
|
||||
user_name: str | None = None,
|
||||
user_id: str | None = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_name}'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_id}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await Request.request(user_data_url)
|
||||
|
||||
@classmethod
|
||||
async def get_solo_data(
|
||||
cls,
|
||||
user_name: str | None = None,
|
||||
user_id: str | None = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取Solo数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await Request.request(user_solo_url)
|
||||
|
||||
@classmethod
|
||||
async def get_user_id(cls, user_data: dict) -> str:
|
||||
'''获取用户ID'''
|
||||
return user_data['data']['user']['_id']
|
||||
|
||||
@classmethod
|
||||
async def check_user_id(cls, user_id: str) -> tuple[bool, str]:
|
||||
'''检查用户ID是否有效 返回值为tuple[bool, message]'''
|
||||
user_data = await cls.get_user_data(user_id=user_id)
|
||||
if user_data[0] is False:
|
||||
return False, '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return False, f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
if user_id == user_data[2]['data']['user']['_id']:
|
||||
return True, ''
|
||||
raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x')
|
||||
|
||||
@classmethod
|
||||
async def get_league_stats(cls, user_data: dict) -> dict[str, Any]:
|
||||
'''获取排位统计数据'''
|
||||
league = user_data['data']['user']['league']
|
||||
league_stats: dict[str, Any] = {}
|
||||
if league['gamesplayed'] != 0:
|
||||
league_stats['PPS'] = league['pps']
|
||||
league_stats['APM'] = league['apm']
|
||||
league_stats['VS'] = 0 if league['vs'] is None else league['vs']
|
||||
league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper(
|
||||
)
|
||||
if league['rating'] == -1:
|
||||
league_stats['Rank'] = None
|
||||
else:
|
||||
league_stats['Rating'] = round(league['rating'], 2)
|
||||
league_stats['Glicko'] = round(league['glicko'], 2)
|
||||
league_stats['RD'] = round(league['rd'], 2)
|
||||
league_stats['Standing'] = league['standing']
|
||||
league_stats['LPM'] = round((league['pps'] * 24), 2)
|
||||
league_stats['APL'] = round(
|
||||
(league_stats['APM'] / league_stats['LPM']), 2)
|
||||
league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2)
|
||||
league_stats['ADPL'] = round(
|
||||
(league_stats['ADPM'] / league_stats['LPM']), 2)
|
||||
return league_stats
|
||||
|
||||
@classmethod
|
||||
async def get_sprint_stats(cls, solo_data: dict) -> dict[str, Any]:
|
||||
'''获取40L统计数据'''
|
||||
sprint_stats = {}
|
||||
solo = solo_data['data']['records']['40l']
|
||||
if solo['record'] is not None:
|
||||
sprint_stats['Time'] = round(
|
||||
solo['record']['endcontext']['finalTime'] / 1000, 2)
|
||||
if solo['rank'] is not None:
|
||||
sprint_stats['Rank'] = solo['rank']
|
||||
return sprint_stats
|
||||
|
||||
@classmethod
|
||||
async def get_blitz_stats(cls, solo_data: dict) -> dict[str, Any]:
|
||||
'''获取Blitz统计数据'''
|
||||
blitz_stats = {}
|
||||
blitz = solo_data['data']['records']['blitz']
|
||||
if blitz['record'] is not None:
|
||||
blitz_stats['Score'] = blitz['record']['endcontext']['score']
|
||||
if blitz['rank'] is not None:
|
||||
blitz_stats['Rank'] = blitz['rank']
|
||||
return blitz_stats
|
||||
|
||||
@classmethod
|
||||
async def generate_message(
|
||||
cls,
|
||||
user_name: str | None = None,
|
||||
user_id: str | None = None
|
||||
) -> str:
|
||||
'''生成消息'''
|
||||
user_data, solo_data = await gather(
|
||||
cls.get_user_data(user_name=user_name, user_id=user_id),
|
||||
cls.get_solo_data(user_name=user_name, user_id=user_id)
|
||||
)
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
user_name = user_data[2]['data']['user']['username'].upper()
|
||||
league_stats = await cls.get_league_stats(user_data[2])
|
||||
message = ''
|
||||
if not league_stats:
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
else:
|
||||
if league_stats['Rank'] is None:
|
||||
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
|
||||
else:
|
||||
if league_stats['Rank'] == 'Z':
|
||||
message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR'
|
||||
else:
|
||||
message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})'
|
||||
message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:'
|
||||
message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )'
|
||||
message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )'
|
||||
if league_stats["VS"] != 0:
|
||||
message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )'
|
||||
if solo_data[0] is False:
|
||||
return f'{message}\nSolo统计数据请求失败'
|
||||
if solo_data[1] is False:
|
||||
return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}'
|
||||
sprint_stats, blitz_stats = await gather(
|
||||
cls.get_sprint_stats(solo_data[2]),
|
||||
cls.get_blitz_stats(solo_data[2])
|
||||
)
|
||||
message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else ''
|
||||
message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''
|
||||
message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else ''
|
||||
message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else ''
|
||||
return message
|
||||
@@ -0,0 +1,155 @@
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
from playwright.async_api import Browser, Response, async_playwright
|
||||
from ujson import JSONDecodeError, dumps, loads
|
||||
|
||||
from ...utils.config import Config
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
config = Config.parse_obj(get_driver().config)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await Request.init_cache()
|
||||
await Request.read_cache()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await Request.close_browser()
|
||||
await Request.write_cache()
|
||||
|
||||
|
||||
class Request:
|
||||
'''网络请求相关类'''
|
||||
_browser: Browser | None = None
|
||||
_headers: dict | None = None
|
||||
_cookies: dict | None = None
|
||||
|
||||
@classmethod
|
||||
async def _init_playwright(cls) -> Browser:
|
||||
'''初始化playwright'''
|
||||
playwright = await async_playwright().start()
|
||||
cls._browser = await playwright.firefox.launch()
|
||||
return cls._browser
|
||||
|
||||
@classmethod
|
||||
async def _get_browser(cls) -> Browser:
|
||||
'''获取浏览器对象'''
|
||||
return cls._browser or await cls._init_playwright()
|
||||
|
||||
@classmethod
|
||||
async def _anti_cloudflare(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''用firefox硬穿五秒盾'''
|
||||
browser = await cls._get_browser()
|
||||
context = await browser.new_context()
|
||||
page = await context.new_page()
|
||||
response = await page.goto(url)
|
||||
attempts = 0
|
||||
while attempts < 60:
|
||||
attempts += 1
|
||||
text = await page.locator("body").text_content()
|
||||
if text is None:
|
||||
await page.wait_for_timeout(1000)
|
||||
continue
|
||||
if await page.title() == 'Please Wait... | Cloudflare':
|
||||
# TODO 有无人来做一个过验证码(
|
||||
break
|
||||
try:
|
||||
data = loads(text)
|
||||
except JSONDecodeError:
|
||||
await page.wait_for_timeout(1000)
|
||||
else:
|
||||
assert isinstance(response, Response)
|
||||
cls._headers = await response.request.all_headers()
|
||||
try:
|
||||
cls._cookies = {i['name']: i['value'] for i in await context.cookies()}
|
||||
except KeyError:
|
||||
cls._cookies = None
|
||||
await page.close()
|
||||
await context.close()
|
||||
return True, data['success'], data
|
||||
await page.close()
|
||||
await context.close()
|
||||
return True, False, {'error': '绕过五秒盾失败'}
|
||||
|
||||
@classmethod
|
||||
async def init_cache(cls) -> None:
|
||||
'''初始化缓存文件'''
|
||||
if not os.path.exists(os.path.dirname(config.cache_path)):
|
||||
os.makedirs(os.path.dirname(config.cache_path))
|
||||
if not os.path.exists(config.cache_path):
|
||||
with open(file=config.cache_path, mode='w', encoding='UTF-8') as file:
|
||||
file.write(
|
||||
dumps(
|
||||
{
|
||||
'headers': cls._headers,
|
||||
'cookies': cls._cookies
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def read_cache(cls) -> None:
|
||||
'''读取缓存文件'''
|
||||
try:
|
||||
with open(file=config.cache_path, mode='r', encoding='UTF-8') as file:
|
||||
json = loads(file.read())
|
||||
cls._headers = json['headers']
|
||||
cls._cookies = json['cookies']
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except PermissionError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
except JSONDecodeError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
|
||||
@classmethod
|
||||
async def write_cache(cls) -> None:
|
||||
'''写入缓存文件'''
|
||||
try:
|
||||
with open(file=config.cache_path, mode='r+', encoding='UTF-8') as file:
|
||||
file.write(
|
||||
dumps(
|
||||
{
|
||||
'headers': cls._headers,
|
||||
'cookies': cls._cookies
|
||||
}
|
||||
)
|
||||
)
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except PermissionError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
except JSONDecodeError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
|
||||
@classmethod
|
||||
async def request(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''请求api'''
|
||||
try:
|
||||
async with aiohttp.ClientSession(cookies=cls._cookies) as session:
|
||||
async with session.get(url, headers=cls._headers) as resp:
|
||||
data = await resp.json()
|
||||
return True, data['success'], data
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error: # type: ignore
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return False, False, {}
|
||||
except aiohttp.client_exceptions.ContentTypeError: # type: ignore
|
||||
return await cls._anti_cloudflare(url)
|
||||
|
||||
@classmethod
|
||||
async def close_browser(cls) -> None:
|
||||
'''关闭浏览器对象'''
|
||||
if isinstance(cls._browser, Browser):
|
||||
await cls._browser.close()
|
||||
@@ -1,16 +1,20 @@
|
||||
from typing import Any
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
from lxml import etree
|
||||
from pandas import read_html
|
||||
import aiohttp
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from lxml import etree
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.log import logger
|
||||
from nonebot.matcher import Matcher
|
||||
from pandas import read_html
|
||||
|
||||
from ..utils.message_analyzer import handle_bind_message, handle_stats_query_message
|
||||
from ..utils.sql import query_bind_info, write_bind_info
|
||||
from ..utils.database import DataBase
|
||||
from ..utils.message_analyzer import (
|
||||
handle_bind_message,
|
||||
handle_stats_query_message
|
||||
)
|
||||
|
||||
TOPBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
|
||||
TopStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
|
||||
@@ -29,10 +33,16 @@ async def _(event: MessageEvent, matcher: Matcher):
|
||||
if await check_user(user_data[1]) is False:
|
||||
await matcher.finish('用户不存在')
|
||||
user_name = await get_user_name(user_data[1])
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(await write_bind_info(qq_number=event.sender.user_id, user=user_name, game_type='TOP'))
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(
|
||||
await DataBase.write_bind_info(
|
||||
qq_number=event.sender.user_id,
|
||||
user=user_name,
|
||||
game_type='TOP'
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@TopStats.handle()
|
||||
@@ -42,8 +52,8 @@ async def _(event: MessageEvent, matcher: Matcher):
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'AT':
|
||||
if event.is_tome() is True:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
|
||||
await matcher.finish('不能查询bot的信息')
|
||||
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
@@ -52,14 +62,16 @@ async def _(event: MessageEvent, matcher: Matcher):
|
||||
if event.sender.user_id is None:
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败, 请联系bot主人')
|
||||
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='TOP')
|
||||
bind_info = await DataBase.query_bind_info(qq_number=event.sender.user_id, game_type='TOP')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(decoded_message[1][1])
|
||||
await matcher.finish(message=message)
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
await matcher.finish(message)
|
||||
|
||||
|
||||
async def get_user_data(user_name: str) -> tuple[bool, str]:
|
||||
@@ -68,10 +80,10 @@ async def get_user_data(user_name: str) -> tuple[bool, str]:
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as resp:
|
||||
return (True, await resp.text())
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
return True, await resp.text()
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error: # type: ignore
|
||||
logger.error(error)
|
||||
return (False, '')
|
||||
return False, ''
|
||||
|
||||
|
||||
async def check_user(user_data: str) -> bool:
|
||||
@@ -153,8 +165,10 @@ async def generate_message(user_name: str) -> str:
|
||||
return '用户信息请求失败'
|
||||
if await check_user(user_data[1]) is False:
|
||||
return '用户不存在'
|
||||
user_name = await get_user_name(user_data[1])
|
||||
game_stats = await get_game_stats(user_data[1])
|
||||
user_name, game_stats = await gather(
|
||||
get_user_name(user_data[1]),
|
||||
get_game_stats(user_data[1])
|
||||
)
|
||||
message = ''
|
||||
if game_stats['24H'] and game_stats['All']:
|
||||
message += f'用户 {user_name} 24小时内统计数据为: '
|
||||
|
||||
@@ -1,17 +1,20 @@
|
||||
from typing import Any
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
import aiohttp
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.log import logger
|
||||
from nonebot.matcher import Matcher
|
||||
|
||||
from ..utils.message_analyzer import handle_stats_query_message
|
||||
|
||||
TOSStats = on_regex(pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
|
||||
flags=I, permission=GROUP)
|
||||
TOSStats = on_regex(
|
||||
pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
|
||||
flags=I,
|
||||
permission=GROUP
|
||||
)
|
||||
|
||||
|
||||
@TOSStats.handle()
|
||||
@@ -21,13 +24,15 @@ async def _(event: MessageEvent, matcher: Matcher):
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'AT' or decoded_message[0] == 'QQ':
|
||||
if decoded_message[1][1] == event.self_id:
|
||||
await matcher.finish(message='不能查询bot的信息')
|
||||
await matcher.finish('不能查询bot的信息')
|
||||
message = await generate_message(tea_id=decoded_message[1][1])
|
||||
elif decoded_message[0] == 'ME':
|
||||
message = await generate_message(tea_id=event.sender.user_id)
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(user_name=decoded_message[1][1])
|
||||
await matcher.finish(message=message)
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
await matcher.finish(message)
|
||||
|
||||
|
||||
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
@@ -36,15 +41,16 @@ async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as resp:
|
||||
data = await resp.json()
|
||||
return (True, data['success'], data)
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
return True, data['success'], data
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error: # type: ignore
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return (False, False, {})
|
||||
return False, False, {}
|
||||
|
||||
|
||||
async def get_user_info(user_name: str = None,
|
||||
tea_id: int = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
async def get_user_info(
|
||||
user_name: str | None = None,
|
||||
tea_id: int | None = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户信息'''
|
||||
if user_name is not None and tea_id is None:
|
||||
user_data_url = f'https://teatube.cn:8888/getUsernameInfo?username={user_name}'
|
||||
@@ -52,13 +58,14 @@ async def get_user_info(user_name: str = None,
|
||||
user_data_url = f'https://teatube.cn:8888/getTeaIdInfo?teaId={tea_id}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_data_url)
|
||||
return await request(user_data_url)
|
||||
|
||||
|
||||
async def get_user_data(user_name: str = None,
|
||||
tea_id: int = None,
|
||||
other_parameter: str = ''
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
async def get_user_data(
|
||||
user_name: str | None = None,
|
||||
tea_id: int | None = None,
|
||||
other_parameter: str = ''
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
if user_name is not None and tea_id is None:
|
||||
user_data_url = f'https://teatube.cn:8888/getProfile?id={user_name}{other_parameter}'
|
||||
@@ -66,14 +73,14 @@ async def get_user_data(user_name: str = None,
|
||||
user_data_url = f'https://teatube.cn:8888/getProfile?id={tea_id}{other_parameter}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await request(url=user_data_url)
|
||||
return await request(user_data_url)
|
||||
|
||||
|
||||
async def get_rank_stats(user_info: dict) -> dict[str, float]:
|
||||
'''获取Rank数据'''
|
||||
data = user_info['data']
|
||||
rank_stats = {}
|
||||
if int(data['rankedGames']) != 0:
|
||||
rank_stats = {}
|
||||
rank_stats['Rating'] = round(float(data['ratingNow']), 2)
|
||||
rank_stats['RD'] = round(float(data['rdNow']), 2)
|
||||
rank_stats['Vol'] = round(float(data['volNow']), 3)
|
||||
@@ -82,8 +89,8 @@ async def get_rank_stats(user_info: dict) -> dict[str, float]:
|
||||
|
||||
async def get_game_data(user_data: dict) -> dict[str, int | float]:
|
||||
'''获取游戏数据'''
|
||||
game_data: dict[str, int | float] = {}
|
||||
if user_data['data'] != []:
|
||||
game_data: dict[str, int | float] = {}
|
||||
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = num = 0
|
||||
for i in user_data['data']:
|
||||
# 排除单人局和时间为0的游戏
|
||||
@@ -131,15 +138,23 @@ async def get_pb_data(user_info: dict) -> dict[str, float | str]:
|
||||
return pb_data
|
||||
|
||||
|
||||
async def generate_message(user_name: str = None, tea_id: int = None) -> str:
|
||||
async def generate_message(
|
||||
user_name: str | None = None,
|
||||
tea_id: int | None = None
|
||||
) -> str:
|
||||
'''生成消息'''
|
||||
user_info, user_data = await gather(get_user_info(user_name=user_name, tea_id=tea_id),
|
||||
get_user_data(user_name=user_name, tea_id=tea_id))
|
||||
user_info, user_data = await gather(
|
||||
get_user_info(user_name=user_name, tea_id=tea_id),
|
||||
get_user_data(user_name=user_name, tea_id=tea_id)
|
||||
)
|
||||
if user_info[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_info[1] is False:
|
||||
return f'用户信息请求错误:\n{user_info[2]["error"]}'
|
||||
rank_stats, pb_data = await gather(get_rank_stats(user_info[2]), get_pb_data(user_info[2]))
|
||||
rank_stats, pb_data = await gather(
|
||||
get_rank_stats(user_info[2]),
|
||||
get_pb_data(user_info[2])
|
||||
)
|
||||
message = f'用户 {user_info[2]["data"]["name"]} ({user_info[2]["data"]["teaId"]}) '
|
||||
if not rank_stats:
|
||||
message += '暂无段位统计数据'
|
||||
|
||||
@@ -1 +1 @@
|
||||
from . import message_analyzer, sql
|
||||
from . import config, database, message_analyzer
|
||||
|
||||
7
nonebot_plugin_tetris_stats/utils/config.py
Normal file
7
nonebot_plugin_tetris_stats/utils/config.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
'''配置类'''
|
||||
cache_path: str = 'cache/nonebot_plugin_tetris_stats/cache'
|
||||
db_path: str = 'data/nonebot_plugin_tetris_stats/data.db'
|
||||
140
nonebot_plugin_tetris_stats/utils/database.py
Normal file
140
nonebot_plugin_tetris_stats/utils/database.py
Normal file
@@ -0,0 +1,140 @@
|
||||
import datetime
|
||||
import os
|
||||
from asyncio import gather
|
||||
from sqlite3 import Connection, connect
|
||||
|
||||
from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
|
||||
from .config import Config
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
config = Config.parse_obj(get_driver().config)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await DataBase.init_db()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await DataBase.close_db()
|
||||
|
||||
|
||||
class DataBase():
|
||||
'''数据库交互类'''
|
||||
_db: Connection | None = None
|
||||
|
||||
@classmethod
|
||||
async def init_db(cls) -> Connection:
|
||||
'''初始化数据库'''
|
||||
if not os.path.exists(os.path.dirname(config.db_path)):
|
||||
os.makedirs(os.path.dirname(config.db_path))
|
||||
cls._db = connect(config.db_path)
|
||||
cursor = cls._db.cursor()
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
|
||||
(QQ INTEGER NOT NULL,
|
||||
USER TEXT NOT NULL)''')
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
|
||||
(QQ INTEGER NOT NULL,
|
||||
USER TEXT NOT NULL)''')
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS IORANK
|
||||
(RANK VARCHAR(2) NOT NULL,
|
||||
TRENDING CHAR(1) NOT NULL,
|
||||
TRLINE FLOAT NOT NULL,
|
||||
PLAYERCOUNT INTEGER NOT NULL,
|
||||
AVGAPM FLOAT NOT NULL,
|
||||
AVGPPS FLOAT NOT NULL,
|
||||
ARGVS FLOAT NOT NULL,
|
||||
DATE TEXT NOT NULL)''')
|
||||
cls._db.commit()
|
||||
logger.info('数据库初始化完成')
|
||||
return cls._db
|
||||
|
||||
@classmethod
|
||||
async def query_rank_info_today(cls, rank: str) -> list | None:
|
||||
'''查询段位信息'''
|
||||
db = await cls._get_db()
|
||||
cursor = db.cursor()
|
||||
cursor.execute('''SELECT TRENDING, TRLINE, PLAYERCOUNT, AVGAPM, AVGPPS, ARGVS, DATE
|
||||
FROM IORANK
|
||||
WHERE RANK = ? AND DATE = ?''', (rank, datetime.date.today()))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result is None:
|
||||
return None
|
||||
|
||||
return list(result)
|
||||
|
||||
@classmethod
|
||||
async def write_rank_info_today(cls, rank: str, trline: int, playercount: int, avgapm: float, avgpps: float, avgvs: float):
|
||||
'''写入段位信息'''
|
||||
|
||||
db = await cls._get_db()
|
||||
cursor = db.cursor()
|
||||
cursor.execute('''SELECT TRLINE
|
||||
FROM IORANK
|
||||
WHERE RANK = ? AND DATE = ?''', (rank, datetime.date.today() - datetime.timedelta(days=1)))
|
||||
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result is None:
|
||||
trending = '?'
|
||||
else:
|
||||
if result[0] > trline:
|
||||
trending = '↑'
|
||||
else:
|
||||
trending = '↓'
|
||||
|
||||
cursor.execute('''INSERT INTO IORANK
|
||||
(RANK, TRENDING, TRLINE, PLAYERCOUNT, AVGAPM, AVGPPS, ARGVS, DATE)
|
||||
VALUES (?,?,?,?,?,?,?,?)''',
|
||||
(rank, trending, trline, playercount, avgapm, avgpps, avgvs, datetime.date.today()))
|
||||
|
||||
db.commit()
|
||||
|
||||
@classmethod
|
||||
async def _get_db(cls) -> Connection:
|
||||
'''获取数据库对象'''
|
||||
return cls._db or await cls.init_db()
|
||||
|
||||
@classmethod
|
||||
async def query_bind_info(cls, qq_number: str | int, game_type: str) -> str | None:
|
||||
'''查询绑定信息'''
|
||||
db = await cls._get_db()
|
||||
cursor = db.cursor()
|
||||
cursor.execute(
|
||||
f'SELECT USER FROM {game_type}BIND WHERE QQ = {qq_number}')
|
||||
user = cursor.fetchone()
|
||||
if user is None:
|
||||
return None
|
||||
return user[0]
|
||||
|
||||
@classmethod
|
||||
async def write_bind_info(cls, qq_number: str | int, user: str, game_type: str) -> str:
|
||||
'''写入绑定信息'''
|
||||
bind_info, db = await gather(
|
||||
cls.query_bind_info(qq_number=qq_number, game_type=game_type),
|
||||
cls._get_db()
|
||||
)
|
||||
cursor = db.cursor()
|
||||
if bind_info is not None:
|
||||
cursor.execute(
|
||||
f'UPDATE {game_type}BIND SET USER = ? WHERE QQ = ?', (user, qq_number))
|
||||
message = '更新成功'
|
||||
elif bind_info is None:
|
||||
cursor.execute(
|
||||
f'INSERT INTO {game_type}BIND (QQ, USER) VALUES (?, ?)', (qq_number, user))
|
||||
message = '绑定成功'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
db.commit()
|
||||
return message
|
||||
|
||||
@classmethod
|
||||
async def close_db(cls) -> None:
|
||||
'''关闭数据库对象'''
|
||||
if isinstance(cls._db, Connection):
|
||||
cls._db.close()
|
||||
@@ -14,7 +14,7 @@ async def handle_bind_message(message: str, game_type: str) -> tuple[str | None,
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
if message == '' or message.isspace():
|
||||
return (None, ('用户名为空', None))
|
||||
return None, ('用户名为空', None)
|
||||
return await check_name(message, game_type)
|
||||
|
||||
|
||||
@@ -23,11 +23,13 @@ async def handle_stats_query_message(message: str, game_type: str) -> tuple[str
|
||||
_cmd_aliases = {'IO': ['io查', 'iostats'],
|
||||
'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'],
|
||||
'TOP': ['top查', 'topstats']}
|
||||
_me = ['我', '自己', '我等', '卑人', '愚', '老身', '爷', '老娘', '本姑娘', '本大爷', '鄙人', '寡人',
|
||||
'小生', '贫僧', '本人', '孤', '吾', '俺', '咱', '私', 'me', '洒家', '在下', '偶', '人家',
|
||||
'本小姐', '老夫', '老子', '朕', '本尊', '僕', '拙者', '妾', '儂', '自分', '吾輩', '我輩', '某',
|
||||
'己等', '俺等', '此方', '哥', '姐', '劳资', '本宝宝', '余', '本喵', 'watashi', 'i', 'myself',
|
||||
'self', 'oneself']
|
||||
_me = [
|
||||
'我', '自己', '我等', '卑人', '愚', '老身', '爷', '老娘', '本姑娘', '本大爷', '鄙人', '寡人',
|
||||
'小生', '贫僧', '本人', '孤', '吾', '俺', '咱', '私', 'me', '洒家', '在下', '偶', '人家',
|
||||
'本小姐', '老夫', '老子', '朕', '本尊', '僕', '拙者', '妾', '儂', '自分', '吾輩', '我輩', '某',
|
||||
'己等', '俺等', '此方', '哥', '姐', '劳资', '本宝宝', '余', '本喵', 'watashi', 'i', 'myself',
|
||||
'self', 'oneself'
|
||||
]
|
||||
# 剔除命令前缀
|
||||
for i in _cmd_aliases[game_type]:
|
||||
if match(rf'(?i){i}', message):
|
||||
@@ -35,41 +37,55 @@ async def handle_stats_query_message(message: str, game_type: str) -> tuple[str
|
||||
message = message.strip()
|
||||
break
|
||||
if message == '' or message.isspace():
|
||||
return (None, ('用户名为空', None))
|
||||
return None, ('用户名为空', None)
|
||||
if message.startswith('[CQ:at,qq='):
|
||||
try:
|
||||
user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0])
|
||||
except ValueError:
|
||||
return (None, ('QQ号码不合法', None))
|
||||
return None, ('QQ号码不合法', None)
|
||||
else:
|
||||
return ('AT', (None, user))
|
||||
return 'AT', (None, user)
|
||||
elif message in _me:
|
||||
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
|
||||
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
|
||||
return ('ME', (None, None))
|
||||
return 'ME', (None, None)
|
||||
else:
|
||||
return await check_name(message, game_type)
|
||||
|
||||
|
||||
async def handle_rank_message(message: str) -> str:
|
||||
_cmd_aliases = ['io段位', 'iorank']
|
||||
# 剔除命令前缀
|
||||
for i in _cmd_aliases:
|
||||
if match(rf'(?i){i}', message):
|
||||
message = sub(rf'(?i){i}', '', message)
|
||||
message = message.strip()
|
||||
break
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return message
|
||||
|
||||
|
||||
async def check_name(name: str, game_type: str) -> tuple[str | None, tuple]:
|
||||
'''返回值为tuple[gameType, tuple[message, user]]'''
|
||||
if game_type == 'IO':
|
||||
if match(r'^[a-f0-9]{24}$', name):
|
||||
return ('ID', (None, name))
|
||||
return 'ID', (None, name)
|
||||
if match(r'^[a-zA-Z0-9_-]{3,16}$', name):
|
||||
return ('Name', (None, name.lower()))
|
||||
return (None, ('用户名不合法', None))
|
||||
return 'Name', (None, name.lower())
|
||||
return None, ('用户名不合法', None)
|
||||
if game_type == 'TOP':
|
||||
if match(r'^[a-zA-Z0-9_]{1,16}$', name):
|
||||
return ('Name', (None, name))
|
||||
return (None, ('用户名不合法', None))
|
||||
return 'Name', (None, name)
|
||||
return None, ('用户名不合法', None)
|
||||
if game_type == 'TOS':
|
||||
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
|
||||
and name.isdigit() is False
|
||||
and 2 <= len(name) <= 18):
|
||||
# 虽然我也不想这么长 但是似乎确实得这么长
|
||||
return ('Name', (None, name))
|
||||
# TODO 简化正则表达式
|
||||
return 'Name', (None, name)
|
||||
if name.isdigit() is True:
|
||||
return ('QQ', (None, name))
|
||||
return (None, ('用户名不合法', None))
|
||||
return (None, ('游戏类型错误', None))
|
||||
return 'QQ', (None, name)
|
||||
return None, ('用户名不合法', None)
|
||||
return None, ('游戏类型错误', None)
|
||||
|
||||
@@ -1,79 +0,0 @@
|
||||
from sqlite3 import connect, Connection
|
||||
import os
|
||||
|
||||
from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
|
||||
|
||||
_DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db'
|
||||
_DB: Connection | None = None
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
'''初始化数据库'''
|
||||
await init_db()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
if isinstance(_DB, Connection):
|
||||
await _DB.close()
|
||||
|
||||
|
||||
async def init_db() -> Connection:
|
||||
'''初始化数据库'''
|
||||
if not os.path.exists(os.path.dirname(_DB_FILE)):
|
||||
if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径
|
||||
os.rename('data/nonebot-plugin-tetris-stats',
|
||||
os.path.dirname(_DB_FILE))
|
||||
else:
|
||||
os.makedirs(os.path.dirname(_DB_FILE))
|
||||
global _DB
|
||||
_DB = connect(_DB_FILE)
|
||||
cursor = _DB.cursor()
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
|
||||
(QQ INTEGER NOT NULL,
|
||||
USER TEXT NOT NULL)''')
|
||||
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
|
||||
(QQ INTEGER NOT NULL,
|
||||
USER TEXT NOT NULL)''')
|
||||
_DB.commit()
|
||||
logger.info('数据库初始化完成')
|
||||
return _DB
|
||||
|
||||
|
||||
async def get_db() -> Connection:
|
||||
'''获取数据库对象'''
|
||||
return _DB or await init_db()
|
||||
|
||||
|
||||
async def query_bind_info(qq_number: str | int, game_type: str) -> str | None:
|
||||
'''查询绑定信息'''
|
||||
db = await get_db()
|
||||
cursor = db.cursor()
|
||||
cursor.execute(f'SELECT USER FROM {game_type}BIND WHERE QQ = {qq_number}')
|
||||
user = cursor.fetchone()
|
||||
db.commit()
|
||||
if user is None:
|
||||
return None
|
||||
return user[0]
|
||||
|
||||
|
||||
async def write_bind_info(qq_number: str | int, user: str, game_type: str) -> str:
|
||||
'''写入绑定信息'''
|
||||
bind_info = await query_bind_info(qq_number, game_type)
|
||||
db = await get_db()
|
||||
cursor = db.cursor()
|
||||
if bind_info is not None:
|
||||
cursor.execute(
|
||||
f'UPDATE {game_type}BIND SET USER = ? WHERE QQ = ?', (user, qq_number))
|
||||
message = '更新成功'
|
||||
elif bind_info is None:
|
||||
cursor.execute(
|
||||
f'INSERT INTO {game_type}BIND (QQ, USER) VALUES (?, ?)', (qq_number, user))
|
||||
message = '绑定成功'
|
||||
db.commit()
|
||||
return message
|
||||
2839
poetry.lock
generated
2839
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "nonebot-plugin-tetris-stats"
|
||||
version = "0.3.0"
|
||||
version = "0.4.1"
|
||||
description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件"
|
||||
authors = ["scdhh <wallfjjd@gmail.com>"]
|
||||
readme = "README.md"
|
||||
@@ -9,7 +9,7 @@ repository = "https://github.com/shoucandanghehe/nonebot-plugin-tetris-stats"
|
||||
license = "MIT"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.10,<3.11"
|
||||
python = "^3.10,<3.12"
|
||||
nonebot-adapter-onebot = "^2.0.0-beta.1"
|
||||
aiohttp = "^3.8.1"
|
||||
asyncio = "^3.4.3"
|
||||
@@ -18,14 +18,15 @@ lxml = "^4.9.1"
|
||||
pandas = "^1.4.3"
|
||||
playwright = "^1.24.1"
|
||||
ujson = "^5.4.0"
|
||||
Brotli = "^1.0.9"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
mypy = "^0.971"
|
||||
autopep8 = "^1.6.0"
|
||||
pylint = "^2.14.5"
|
||||
types-ujson = "^5.4.0"
|
||||
mypy = "^0.991"
|
||||
autopep8 = "^2.0.0"
|
||||
pylint = "^2.15.9"
|
||||
types-ujson = "^5.5.0"
|
||||
lxml-stubs = "^0.4.0"
|
||||
pandas-stubs = "^1.4.3"
|
||||
pandas-stubs = "^1.5.2"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
|
||||
Reference in New Issue
Block a user