mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
Compare commits
81 Commits
0.3.2
...
0.3.3-fix1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fe412d5acd | ||
|
|
ac8d332e69 | ||
| 2015917ca0 | |||
|
|
264c5c04e9 | ||
| 14d646a2d0 | |||
|
|
d2a33ec5f8 | ||
|
|
c4c0faf64b | ||
|
|
a8e769866f | ||
|
|
38595d46b9 | ||
|
|
bd08166f45 | ||
|
|
5b1e2a1e28 | ||
|
|
903fe34302 | ||
|
|
0ea597809f | ||
|
|
58b4c8ab9a | ||
|
|
e82a9d4747 | ||
|
|
6125190fe0 | ||
|
|
402c30b32a | ||
|
|
4d51afa726 | ||
|
|
4af6170470 | ||
|
|
98244eed60 | ||
|
|
2e222ed6f5 | ||
|
|
c224a0f880 | ||
|
|
7df6cb3396 | ||
|
|
ee9da6a5e8 | ||
|
|
cc3a53aa3b | ||
|
|
ea767291a2 | ||
|
|
cffe472ad3 | ||
|
|
6e6b16c6f3 | ||
|
|
bdb0139cf1 | ||
|
|
ad9c7b24a6 | ||
|
|
6285900e16 | ||
|
|
a1d882c122 | ||
|
|
f6d5be7cc0 | ||
| 09e18c526a | |||
| af8fa05394 | |||
|
|
e099beeb6b | ||
|
|
5f8f8b67b2 | ||
|
|
a0b6c85637 | ||
|
|
163f0e139a | ||
|
|
9a02eb5cce | ||
|
|
2f9849c74e | ||
|
|
650dfb0669 | ||
|
|
a0e0ff2fa3 | ||
|
|
f5188594f1 | ||
|
|
26374a96fb | ||
|
|
67ccf465ce | ||
|
|
6bcde5cf74 | ||
|
|
1fffa560fb | ||
|
|
d995bfe74f | ||
|
|
9437d99ec4 | ||
|
|
4aed7862a3 | ||
|
|
8ee6648d2a | ||
|
|
1e0ca7149d | ||
|
|
dafb4352fa | ||
|
|
ae74cdbc0c | ||
|
|
364717f049 | ||
|
|
a05260ba4a | ||
|
|
a80d9586ff | ||
| d0a0baa275 | |||
|
|
625fc895ea | ||
|
|
3b12d74193 | ||
|
|
359f3964fb | ||
|
|
e655737935 | ||
|
|
98cdf6a529 | ||
|
|
53379eb951 | ||
|
|
4567764079 | ||
|
|
60d6e487d2 | ||
|
|
09c57f52ea | ||
|
|
bab4b06de0 | ||
|
|
175daafe5d | ||
|
|
a2d8bc55cc | ||
|
|
d1a2a20e13 | ||
|
|
e32a32dee0 | ||
|
|
540b01649d | ||
|
|
f224af7aa4 | ||
|
|
c5db3fef3f | ||
|
|
884cc08d77 | ||
|
|
b30d5e008d | ||
|
|
0a3e265272 | ||
|
|
b32c2f1895 | ||
| d62c53031e |
1
.github/dependabot.yml
vendored
1
.github/dependabot.yml
vendored
@@ -7,5 +7,6 @@ version: 2
|
||||
updates:
|
||||
- package-ecosystem: "pip" # See documentation for possible values
|
||||
directory: "/" # Location of package manifests
|
||||
target-branch: "dev"
|
||||
schedule:
|
||||
interval: "daily"
|
||||
|
||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -2,3 +2,4 @@ dist
|
||||
test*
|
||||
Untitled*
|
||||
*copy*
|
||||
.vscode
|
||||
|
||||
@@ -1,360 +0,0 @@
|
||||
import os
|
||||
from asyncio import gather
|
||||
from re import I
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from nonebot import get_driver, on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.log import logger
|
||||
from nonebot.matcher import Matcher
|
||||
from playwright.async_api import Browser, Response, async_playwright
|
||||
from ujson import JSONDecodeError, dumps, loads
|
||||
|
||||
from ..utils.config import Config
|
||||
from ..utils.database import DataBase
|
||||
from ..utils.message_analyzer import (
|
||||
handle_bind_message,
|
||||
handle_stats_query_message
|
||||
)
|
||||
|
||||
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
|
||||
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
config = Config.parse_obj(get_driver().config)
|
||||
|
||||
|
||||
@IOBind.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_bind_message(message=event.raw_message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
if decoded_message[0] == 'ID':
|
||||
user_id_stats = await check_user_id(decoded_message[1][1])
|
||||
if user_id_stats[0] is False:
|
||||
await matcher.finish(user_id_stats[1])
|
||||
else:
|
||||
user_id = decoded_message[1][1]
|
||||
elif decoded_message[0] == 'Name':
|
||||
user_data = await get_user_data(user_name=decoded_message[1][1])
|
||||
if user_data[0] is False:
|
||||
await matcher.finish('用户信息请求失败')
|
||||
elif user_data[1] is False:
|
||||
await matcher.finish(f'用户信息请求错误:\n{user_data[2]["error"]}')
|
||||
else:
|
||||
user_id = await get_user_id(user_data[2])
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(
|
||||
await DataBase.write_bind_info(
|
||||
qq_number=event.sender.user_id,
|
||||
user=user_id,
|
||||
game_type='IO'
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@IOStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
await matcher.finish(decoded_message[1][0])
|
||||
elif decoded_message[0] == 'AT':
|
||||
if event.is_tome() is True:
|
||||
await matcher.finish('不能查询bot的信息')
|
||||
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
|
||||
elif decoded_message[0] == 'ME':
|
||||
if event.sender.user_id is None:
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败, 请联系bot主人')
|
||||
bind_info = await DataBase.query_bind_info(qq_number=event.sender.user_id, game_type='IO')
|
||||
if bind_info is None:
|
||||
message = '未查询到绑定信息'
|
||||
else:
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}')
|
||||
elif decoded_message[0] == 'ID':
|
||||
message = await generate_message(user_id=decoded_message[1][1])
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(user_name=decoded_message[1][1])
|
||||
await matcher.finish(message)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await Request.init_cache()
|
||||
await Request.read_cache()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await Request.close_browser()
|
||||
|
||||
|
||||
async def get_user_data(
|
||||
user_name: str = None,
|
||||
user_id: str = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_name}'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_id}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await Request.request(user_data_url)
|
||||
|
||||
|
||||
async def get_solo_data(
|
||||
user_name: str = None,
|
||||
user_id: str = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取Solo数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await Request.request(user_solo_url)
|
||||
|
||||
|
||||
async def get_user_id(user_data: dict) -> str:
|
||||
'''获取用户ID'''
|
||||
return user_data['data']['user']['_id']
|
||||
|
||||
|
||||
async def check_user_id(user_id: str) -> tuple[bool, str]:
|
||||
'''检查用户ID是否有效'''
|
||||
user_data = await get_user_data(user_id=user_id)
|
||||
if user_data[0] is False:
|
||||
return False, '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return False, f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
if user_id == user_data[2]['data']['user']['_id']:
|
||||
return True, ''
|
||||
raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x')
|
||||
|
||||
|
||||
async def get_league_stats(user_data: dict) -> dict[str, Any]:
|
||||
'''获取排位统计数据'''
|
||||
league = user_data['data']['user']['league']
|
||||
league_stats: dict[str, Any] = {}
|
||||
if league['gamesplayed'] != 0:
|
||||
league_stats['PPS'] = league['pps']
|
||||
league_stats['APM'] = league['apm']
|
||||
league_stats['VS'] = 0 if league['vs'] is None else league['vs']
|
||||
league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper()
|
||||
if league['rating'] == -1:
|
||||
league_stats['Rank'] = None
|
||||
else:
|
||||
league_stats['Rating'] = round(league['rating'], 2)
|
||||
league_stats['Glicko'] = round(league['glicko'], 2)
|
||||
league_stats['RD'] = round(league['rd'], 2)
|
||||
league_stats['Standing'] = league['standing']
|
||||
league_stats['LPM'] = round((league['pps'] * 24), 2)
|
||||
league_stats['APL'] = round(
|
||||
(league_stats['APM'] / league_stats['LPM']), 2)
|
||||
league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2)
|
||||
league_stats['ADPL'] = round(
|
||||
(league_stats['ADPM'] / league_stats['LPM']), 2)
|
||||
return league_stats
|
||||
|
||||
|
||||
async def get_sprint_stats(solo_data: dict) -> dict[str, Any]:
|
||||
'''获取40L统计数据'''
|
||||
sprint_stats = {}
|
||||
solo = solo_data['data']['records']['40l']
|
||||
if solo['record'] is not None:
|
||||
sprint_stats['Time'] = round(
|
||||
solo['record']['endcontext']['finalTime'] / 1000, 2)
|
||||
if solo['rank'] is not None:
|
||||
sprint_stats['Rank'] = solo['rank']
|
||||
return sprint_stats
|
||||
|
||||
|
||||
async def get_blitz_stats(solo_data: dict) -> dict[str, Any]:
|
||||
'''获取Blitz统计数据'''
|
||||
blitz_stats = {}
|
||||
blitz = solo_data['data']['records']['blitz']
|
||||
if blitz['record'] is not None:
|
||||
blitz_stats['Score'] = blitz['record']['endcontext']['score']
|
||||
if blitz['rank'] is not None:
|
||||
blitz_stats['Rank'] = blitz['rank']
|
||||
return blitz_stats
|
||||
|
||||
|
||||
async def generate_message(user_name: str = None, user_id: str = None) -> str:
|
||||
'''生成消息'''
|
||||
user_data, solo_data = await gather(
|
||||
get_user_data(user_name=user_name, user_id=user_id),
|
||||
get_solo_data(user_name=user_name, user_id=user_id)
|
||||
)
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
user_name = user_data[2]['data']['user']['username'].upper()
|
||||
league_stats = await get_league_stats(user_data[2])
|
||||
message = ''
|
||||
if not league_stats:
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
else:
|
||||
if league_stats['Rank'] is None:
|
||||
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
|
||||
else:
|
||||
if league_stats['Rank'] == 'Z':
|
||||
message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR'
|
||||
else:
|
||||
message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})'
|
||||
message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:'
|
||||
message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )'
|
||||
message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )'
|
||||
if league_stats["VS"] != 0:
|
||||
message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )'
|
||||
if solo_data[0] is False:
|
||||
return f'{message}\nSolo统计数据请求失败'
|
||||
if solo_data[1] is False:
|
||||
return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}'
|
||||
sprint_stats, blitz_stats = await gather(
|
||||
get_sprint_stats(solo_data[2]),
|
||||
get_blitz_stats(solo_data[2])
|
||||
)
|
||||
message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else ''
|
||||
message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''
|
||||
message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else ''
|
||||
message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else ''
|
||||
return message
|
||||
|
||||
|
||||
class Request:
|
||||
'''网络请求相关类'''
|
||||
_browser: Browser | None = None
|
||||
_headers: dict | None = None
|
||||
_cookies: dict | None = None
|
||||
|
||||
@classmethod
|
||||
async def _init_playwright(cls) -> Browser:
|
||||
'''初始化playwright'''
|
||||
playwright = await async_playwright().start()
|
||||
cls._browser = await playwright.firefox.launch()
|
||||
return cls._browser
|
||||
|
||||
@classmethod
|
||||
async def _get_browser(cls) -> Browser:
|
||||
'''获取浏览器对象'''
|
||||
return cls._browser or await cls._init_playwright()
|
||||
|
||||
@classmethod
|
||||
async def _anti_cloudflare(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''用firefox硬穿五秒盾'''
|
||||
browser = await cls._get_browser()
|
||||
context = await browser.new_context()
|
||||
page = await context.new_page()
|
||||
response = await page.goto(url)
|
||||
attempts = 0
|
||||
while attempts < 60:
|
||||
attempts += 1
|
||||
text = await page.locator("body").text_content()
|
||||
if text is None:
|
||||
await page.wait_for_timeout(1000)
|
||||
continue
|
||||
if await page.title() == 'Please Wait... | Cloudflare':
|
||||
# TODO 有无人来做一个过验证码(
|
||||
break
|
||||
try:
|
||||
data = loads(text)
|
||||
except JSONDecodeError:
|
||||
await page.wait_for_timeout(1000)
|
||||
else:
|
||||
assert isinstance(response, Response)
|
||||
cls._headers = await response.request.all_headers()
|
||||
cls._cookies = {i['name']: i['value'] for i in await context.cookies()}
|
||||
await cls._write_cache()
|
||||
await page.close()
|
||||
await context.close()
|
||||
return True, data['success'], data
|
||||
await page.close()
|
||||
await context.close()
|
||||
return True, False, {'error': '绕过五秒盾失败'}
|
||||
|
||||
@classmethod
|
||||
async def init_cache(cls) -> None:
|
||||
'''初始化缓存文件'''
|
||||
if not os.path.exists(os.path.dirname(config.cache_path)):
|
||||
os.makedirs(os.path.dirname(config.cache_path))
|
||||
if not os.path.exists(config.cache_path):
|
||||
with open(file=config.cache_path, mode='w', encoding='UTF-8') as file:
|
||||
file.write(
|
||||
dumps(
|
||||
{
|
||||
'headers': cls._headers,
|
||||
'cookies': cls._cookies
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def read_cache(cls) -> None:
|
||||
'''读取缓存文件'''
|
||||
try:
|
||||
with open(file=config.cache_path, mode='r', encoding='UTF-8') as file:
|
||||
json = loads(file.read())
|
||||
cls._headers = json['headers']
|
||||
cls._cookies = json['cookies']
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except PermissionError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
except JSONDecodeError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
|
||||
@classmethod
|
||||
async def _write_cache(cls) -> None:
|
||||
'''写入缓存文件'''
|
||||
try:
|
||||
with open(file=config.cache_path, mode='r+', encoding='UTF-8') as file:
|
||||
file.write(
|
||||
dumps(
|
||||
{
|
||||
'headers': cls._headers,
|
||||
'cookies': cls._cookies
|
||||
}
|
||||
)
|
||||
)
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except PermissionError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
except JSONDecodeError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
|
||||
@classmethod
|
||||
async def request(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''请求api'''
|
||||
try:
|
||||
async with aiohttp.ClientSession(cookies=cls._cookies) as session:
|
||||
async with session.get(url, headers=cls._headers) as resp:
|
||||
data = await resp.json()
|
||||
return True, data['success'], data
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return False, False, {}
|
||||
except aiohttp.client_exceptions.ContentTypeError:
|
||||
return await cls._anti_cloudflare(url)
|
||||
|
||||
@classmethod
|
||||
async def close_browser(cls) -> None:
|
||||
'''关闭浏览器对象'''
|
||||
if isinstance(cls._browser, Browser):
|
||||
await cls._browser.close()
|
||||
@@ -0,0 +1,32 @@
|
||||
from re import I
|
||||
|
||||
from nonebot import on_regex
|
||||
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
|
||||
from nonebot.matcher import Matcher
|
||||
|
||||
from .processor import Processor
|
||||
|
||||
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
|
||||
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
|
||||
|
||||
|
||||
@IOBind.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
await matcher.finish(
|
||||
await Processor.handle_bind(
|
||||
message=event.raw_message,
|
||||
qq_number=event.sender.user_id
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@IOStats.handle()
|
||||
async def _(event: MessageEvent, matcher: Matcher):
|
||||
if event.is_tome():
|
||||
await matcher.finish('不能查询bot的信息')
|
||||
await matcher.finish(
|
||||
await Processor.handle_query(
|
||||
message=event.raw_message,
|
||||
qq_number=event.sender.user_id
|
||||
)
|
||||
)
|
||||
@@ -0,0 +1,211 @@
|
||||
from asyncio import gather
|
||||
from typing import Any
|
||||
|
||||
from nonebot.log import logger
|
||||
|
||||
from ...utils.database import DataBase
|
||||
from ...utils.message_analyzer import (
|
||||
handle_bind_message,
|
||||
handle_stats_query_message
|
||||
)
|
||||
from .request import Request
|
||||
|
||||
|
||||
class Processor:
|
||||
@classmethod
|
||||
async def handle_bind(cls, message: str, qq_number: int | None) -> str:
|
||||
'''处理绑定消息'''
|
||||
decoded_message = await handle_bind_message(message=message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
return decoded_message[1][0]
|
||||
if decoded_message[0] == 'ID':
|
||||
user_id_stats = await cls.check_user_id(decoded_message[1][1])
|
||||
if user_id_stats[0] is False:
|
||||
return user_id_stats[1]
|
||||
user_id = decoded_message[1][1]
|
||||
elif decoded_message[0] == 'Name':
|
||||
user_data = await cls.get_user_data(user_name=decoded_message[1][1])
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
user_id = await cls.get_user_id(user_data[2])
|
||||
if qq_number is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
return '获取QQ号失败'
|
||||
return (
|
||||
await DataBase.write_bind_info(
|
||||
qq_number=qq_number,
|
||||
user=user_id,
|
||||
game_type='IO'
|
||||
)
|
||||
)
|
||||
logger.error('预期外行为, 请上报GitHub')
|
||||
return '出现预期外行为,请查看后台信息'
|
||||
|
||||
@classmethod
|
||||
async def handle_query(cls, message: str, qq_number: int | None):
|
||||
'''处理查询消息'''
|
||||
decoded_message = await handle_stats_query_message(message=message, game_type='IO')
|
||||
if decoded_message[0] is None:
|
||||
return decoded_message[1][0]
|
||||
if decoded_message[0] == 'AT': # 在入口处判断是否@bot本身
|
||||
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
|
||||
if bind_info is None:
|
||||
return '未查询到绑定信息'
|
||||
return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}'
|
||||
if decoded_message[0] == 'ME':
|
||||
if qq_number is None:
|
||||
logger.error('获取QQ号失败')
|
||||
return '获取QQ号失败, 请联系bot主人'
|
||||
bind_info = await DataBase.query_bind_info(qq_number=qq_number, game_type='IO')
|
||||
if bind_info is None:
|
||||
return '未查询到绑定信息'
|
||||
return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}'
|
||||
if decoded_message[0] == 'ID':
|
||||
return await Processor.generate_message(user_id=decoded_message[1][1])
|
||||
if decoded_message[0] == 'Name':
|
||||
return await Processor.generate_message(user_name=decoded_message[1][1])
|
||||
|
||||
@classmethod
|
||||
async def get_user_data(
|
||||
cls,
|
||||
user_name: str | None = None,
|
||||
user_id: str | None = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_name}'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_data_url = f'https://ch.tetr.io/api/users/{user_id}'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await Request.request(user_data_url)
|
||||
|
||||
@classmethod
|
||||
async def get_solo_data(
|
||||
cls,
|
||||
user_name: str | None = None,
|
||||
user_id: str | None = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取Solo数据'''
|
||||
if user_name is not None and user_id is None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records'
|
||||
elif user_name is None and user_id is not None:
|
||||
user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
return await Request.request(user_solo_url)
|
||||
|
||||
@classmethod
|
||||
async def get_user_id(cls, user_data: dict) -> str:
|
||||
'''获取用户ID'''
|
||||
return user_data['data']['user']['_id']
|
||||
|
||||
@classmethod
|
||||
async def check_user_id(cls, user_id: str) -> tuple[bool, str]:
|
||||
'''检查用户ID是否有效 返回值为tuple[bool, message]'''
|
||||
user_data = await cls.get_user_data(user_id=user_id)
|
||||
if user_data[0] is False:
|
||||
return False, '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return False, f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
if user_id == user_data[2]['data']['user']['_id']:
|
||||
return True, ''
|
||||
raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x')
|
||||
|
||||
@classmethod
|
||||
async def get_league_stats(cls, user_data: dict) -> dict[str, Any]:
|
||||
'''获取排位统计数据'''
|
||||
league = user_data['data']['user']['league']
|
||||
league_stats: dict[str, Any] = {}
|
||||
if league['gamesplayed'] != 0:
|
||||
league_stats['PPS'] = league['pps']
|
||||
league_stats['APM'] = league['apm']
|
||||
league_stats['VS'] = 0 if league['vs'] is None else league['vs']
|
||||
league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper(
|
||||
)
|
||||
if league['rating'] == -1:
|
||||
league_stats['Rank'] = None
|
||||
else:
|
||||
league_stats['Rating'] = round(league['rating'], 2)
|
||||
league_stats['Glicko'] = round(league['glicko'], 2)
|
||||
league_stats['RD'] = round(league['rd'], 2)
|
||||
league_stats['Standing'] = league['standing']
|
||||
league_stats['LPM'] = round((league['pps'] * 24), 2)
|
||||
league_stats['APL'] = round(
|
||||
(league_stats['APM'] / league_stats['LPM']), 2)
|
||||
league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2)
|
||||
league_stats['ADPL'] = round(
|
||||
(league_stats['ADPM'] / league_stats['LPM']), 2)
|
||||
return league_stats
|
||||
|
||||
@classmethod
|
||||
async def get_sprint_stats(cls, solo_data: dict) -> dict[str, Any]:
|
||||
'''获取40L统计数据'''
|
||||
sprint_stats = {}
|
||||
solo = solo_data['data']['records']['40l']
|
||||
if solo['record'] is not None:
|
||||
sprint_stats['Time'] = round(
|
||||
solo['record']['endcontext']['finalTime'] / 1000, 2)
|
||||
if solo['rank'] is not None:
|
||||
sprint_stats['Rank'] = solo['rank']
|
||||
return sprint_stats
|
||||
|
||||
@classmethod
|
||||
async def get_blitz_stats(cls, solo_data: dict) -> dict[str, Any]:
|
||||
'''获取Blitz统计数据'''
|
||||
blitz_stats = {}
|
||||
blitz = solo_data['data']['records']['blitz']
|
||||
if blitz['record'] is not None:
|
||||
blitz_stats['Score'] = blitz['record']['endcontext']['score']
|
||||
if blitz['rank'] is not None:
|
||||
blitz_stats['Rank'] = blitz['rank']
|
||||
return blitz_stats
|
||||
|
||||
@classmethod
|
||||
async def generate_message(
|
||||
cls,
|
||||
user_name: str | None = None,
|
||||
user_id: str | None = None
|
||||
) -> str:
|
||||
'''生成消息'''
|
||||
user_data, solo_data = await gather(
|
||||
cls.get_user_data(user_name=user_name, user_id=user_id),
|
||||
cls.get_solo_data(user_name=user_name, user_id=user_id)
|
||||
)
|
||||
if user_data[0] is False:
|
||||
return '用户信息请求失败'
|
||||
if user_data[1] is False:
|
||||
return f'用户信息请求错误:\n{user_data[2]["error"]}'
|
||||
user_name = user_data[2]['data']['user']['username'].upper()
|
||||
league_stats = await cls.get_league_stats(user_data[2])
|
||||
message = ''
|
||||
if not league_stats:
|
||||
message += f'用户 {user_name} 没有排位统计数据'
|
||||
else:
|
||||
if league_stats['Rank'] is None:
|
||||
message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:'
|
||||
else:
|
||||
if league_stats['Rank'] == 'Z':
|
||||
message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR'
|
||||
else:
|
||||
message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})'
|
||||
message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:'
|
||||
message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )'
|
||||
message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )'
|
||||
if league_stats["VS"] != 0:
|
||||
message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )'
|
||||
if solo_data[0] is False:
|
||||
return f'{message}\nSolo统计数据请求失败'
|
||||
if solo_data[1] is False:
|
||||
return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}'
|
||||
sprint_stats, blitz_stats = await gather(
|
||||
cls.get_sprint_stats(solo_data[2]),
|
||||
cls.get_blitz_stats(solo_data[2])
|
||||
)
|
||||
message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else ''
|
||||
message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else ''
|
||||
message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else ''
|
||||
message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else ''
|
||||
return message
|
||||
@@ -0,0 +1,155 @@
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
from nonebot import get_driver
|
||||
from nonebot.log import logger
|
||||
from playwright.async_api import Browser, Response, async_playwright
|
||||
from ujson import JSONDecodeError, dumps, loads
|
||||
|
||||
from ...utils.config import Config
|
||||
|
||||
driver = get_driver()
|
||||
|
||||
config = Config.parse_obj(get_driver().config)
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
async def _():
|
||||
await Request.init_cache()
|
||||
await Request.read_cache()
|
||||
|
||||
|
||||
@driver.on_shutdown
|
||||
async def _():
|
||||
await Request.close_browser()
|
||||
await Request.write_cache()
|
||||
|
||||
|
||||
class Request:
|
||||
'''网络请求相关类'''
|
||||
_browser: Browser | None = None
|
||||
_headers: dict | None = None
|
||||
_cookies: dict | None = None
|
||||
|
||||
@classmethod
|
||||
async def _init_playwright(cls) -> Browser:
|
||||
'''初始化playwright'''
|
||||
playwright = await async_playwright().start()
|
||||
cls._browser = await playwright.firefox.launch()
|
||||
return cls._browser
|
||||
|
||||
@classmethod
|
||||
async def _get_browser(cls) -> Browser:
|
||||
'''获取浏览器对象'''
|
||||
return cls._browser or await cls._init_playwright()
|
||||
|
||||
@classmethod
|
||||
async def _anti_cloudflare(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''用firefox硬穿五秒盾'''
|
||||
browser = await cls._get_browser()
|
||||
context = await browser.new_context()
|
||||
page = await context.new_page()
|
||||
response = await page.goto(url)
|
||||
attempts = 0
|
||||
while attempts < 60:
|
||||
attempts += 1
|
||||
text = await page.locator("body").text_content()
|
||||
if text is None:
|
||||
await page.wait_for_timeout(1000)
|
||||
continue
|
||||
if await page.title() == 'Please Wait... | Cloudflare':
|
||||
# TODO 有无人来做一个过验证码(
|
||||
break
|
||||
try:
|
||||
data = loads(text)
|
||||
except JSONDecodeError:
|
||||
await page.wait_for_timeout(1000)
|
||||
else:
|
||||
assert isinstance(response, Response)
|
||||
cls._headers = await response.request.all_headers()
|
||||
try:
|
||||
cls._cookies = {i['name']: i['value'] for i in await context.cookies()}
|
||||
except KeyError:
|
||||
cls._cookies = None
|
||||
await page.close()
|
||||
await context.close()
|
||||
return True, data['success'], data
|
||||
await page.close()
|
||||
await context.close()
|
||||
return True, False, {'error': '绕过五秒盾失败'}
|
||||
|
||||
@classmethod
|
||||
async def init_cache(cls) -> None:
|
||||
'''初始化缓存文件'''
|
||||
if not os.path.exists(os.path.dirname(config.cache_path)):
|
||||
os.makedirs(os.path.dirname(config.cache_path))
|
||||
if not os.path.exists(config.cache_path):
|
||||
with open(file=config.cache_path, mode='w', encoding='UTF-8') as file:
|
||||
file.write(
|
||||
dumps(
|
||||
{
|
||||
'headers': cls._headers,
|
||||
'cookies': cls._cookies
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def read_cache(cls) -> None:
|
||||
'''读取缓存文件'''
|
||||
try:
|
||||
with open(file=config.cache_path, mode='r', encoding='UTF-8') as file:
|
||||
json = loads(file.read())
|
||||
cls._headers = json['headers']
|
||||
cls._cookies = json['cookies']
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except PermissionError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
except JSONDecodeError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
|
||||
@classmethod
|
||||
async def write_cache(cls) -> None:
|
||||
'''写入缓存文件'''
|
||||
try:
|
||||
with open(file=config.cache_path, mode='r+', encoding='UTF-8') as file:
|
||||
file.write(
|
||||
dumps(
|
||||
{
|
||||
'headers': cls._headers,
|
||||
'cookies': cls._cookies
|
||||
}
|
||||
)
|
||||
)
|
||||
except FileNotFoundError:
|
||||
await cls.init_cache()
|
||||
except PermissionError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
except JSONDecodeError:
|
||||
os.remove(config.cache_path)
|
||||
await cls.init_cache()
|
||||
|
||||
@classmethod
|
||||
async def request(cls, url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''请求api'''
|
||||
try:
|
||||
async with aiohttp.ClientSession(cookies=cls._cookies) as session:
|
||||
async with session.get(url, headers=cls._headers) as resp:
|
||||
data = await resp.json()
|
||||
return True, data['success'], data
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error: # type: ignore
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return False, False, {}
|
||||
except aiohttp.client_exceptions.ContentTypeError: # type: ignore
|
||||
return await cls._anti_cloudflare(url)
|
||||
|
||||
@classmethod
|
||||
async def close_browser(cls) -> None:
|
||||
'''关闭浏览器对象'''
|
||||
if isinstance(cls._browser, Browser):
|
||||
await cls._browser.close()
|
||||
@@ -33,16 +33,16 @@ async def _(event: MessageEvent, matcher: Matcher):
|
||||
if await check_user(user_data[1]) is False:
|
||||
await matcher.finish('用户不存在')
|
||||
user_name = await get_user_name(user_data[1])
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(
|
||||
await DataBase.write_bind_info(
|
||||
qq_number=event.sender.user_id,
|
||||
user=user_name,
|
||||
game_type='TOP'
|
||||
)
|
||||
)
|
||||
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是(
|
||||
logger.error('获取QQ号失败')
|
||||
await matcher.finish('获取QQ号失败')
|
||||
await matcher.finish(
|
||||
await DataBase.write_bind_info(
|
||||
qq_number=event.sender.user_id,
|
||||
user=user_name,
|
||||
game_type='TOP'
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@TopStats.handle()
|
||||
@@ -69,6 +69,8 @@ async def _(event: MessageEvent, matcher: Matcher):
|
||||
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(decoded_message[1][1])
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
await matcher.finish(message)
|
||||
|
||||
|
||||
@@ -79,7 +81,7 @@ async def get_user_data(user_name: str) -> tuple[bool, str]:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url) as resp:
|
||||
return True, await resp.text()
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error: # type: ignore
|
||||
logger.error(error)
|
||||
return False, ''
|
||||
|
||||
|
||||
@@ -30,6 +30,8 @@ async def _(event: MessageEvent, matcher: Matcher):
|
||||
message = await generate_message(tea_id=event.sender.user_id)
|
||||
elif decoded_message[0] == 'Name':
|
||||
message = await generate_message(user_name=decoded_message[1][1])
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
await matcher.finish(message)
|
||||
|
||||
|
||||
@@ -40,14 +42,14 @@ async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
|
||||
async with session.get(url) as resp:
|
||||
data = await resp.json()
|
||||
return True, data['success'], data
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error:
|
||||
except aiohttp.client_exceptions.ClientConnectorError as error: # type: ignore
|
||||
logger.error(f'请求错误\n{error}')
|
||||
return False, False, {}
|
||||
|
||||
|
||||
async def get_user_info(
|
||||
user_name: str = None,
|
||||
tea_id: int = None
|
||||
user_name: str | None = None,
|
||||
tea_id: int | None = None
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户信息'''
|
||||
if user_name is not None and tea_id is None:
|
||||
@@ -60,8 +62,8 @@ async def get_user_info(
|
||||
|
||||
|
||||
async def get_user_data(
|
||||
user_name: str = None,
|
||||
tea_id: int = None,
|
||||
user_name: str | None = None,
|
||||
tea_id: int | None = None,
|
||||
other_parameter: str = ''
|
||||
) -> tuple[bool, bool, dict[str, Any]]:
|
||||
'''获取用户数据'''
|
||||
@@ -77,8 +79,8 @@ async def get_user_data(
|
||||
async def get_rank_stats(user_info: dict) -> dict[str, float]:
|
||||
'''获取Rank数据'''
|
||||
data = user_info['data']
|
||||
rank_stats = {}
|
||||
if int(data['rankedGames']) != 0:
|
||||
rank_stats = {}
|
||||
rank_stats['Rating'] = round(float(data['ratingNow']), 2)
|
||||
rank_stats['RD'] = round(float(data['rdNow']), 2)
|
||||
rank_stats['Vol'] = round(float(data['volNow']), 3)
|
||||
@@ -87,8 +89,8 @@ async def get_rank_stats(user_info: dict) -> dict[str, float]:
|
||||
|
||||
async def get_game_data(user_data: dict) -> dict[str, int | float]:
|
||||
'''获取游戏数据'''
|
||||
game_data: dict[str, int | float] = {}
|
||||
if user_data['data'] != []:
|
||||
game_data: dict[str, int | float] = {}
|
||||
weighted_total_lpm = weighted_total_apm = weighted_total_adpm = total_time = num = 0
|
||||
for i in user_data['data']:
|
||||
# 排除单人局和时间为0的游戏
|
||||
@@ -136,7 +138,10 @@ async def get_pb_data(user_info: dict) -> dict[str, float | str]:
|
||||
return pb_data
|
||||
|
||||
|
||||
async def generate_message(user_name: str = None, tea_id: int = None) -> str:
|
||||
async def generate_message(
|
||||
user_name: str | None = None,
|
||||
tea_id: int | None = None
|
||||
) -> str:
|
||||
'''生成消息'''
|
||||
user_info, user_data = await gather(
|
||||
get_user_info(user_name=user_name, tea_id=tea_id),
|
||||
|
||||
@@ -76,6 +76,8 @@ class DataBase():
|
||||
cursor.execute(
|
||||
f'INSERT INTO {game_type}BIND (QQ, USER) VALUES (?, ?)', (qq_number, user))
|
||||
message = '绑定成功'
|
||||
else:
|
||||
raise ValueError('预期外行为, 请上报GitHub')
|
||||
db.commit()
|
||||
return message
|
||||
|
||||
|
||||
3027
poetry.lock
generated
3027
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "nonebot-plugin-tetris-stats"
|
||||
version = "0.3.2"
|
||||
version = "0.3.3"
|
||||
description = "一个基于nonebot2的用于查询TETRIS相关游戏玩家数据的插件"
|
||||
authors = ["scdhh <wallfjjd@gmail.com>"]
|
||||
readme = "README.md"
|
||||
@@ -9,7 +9,7 @@ repository = "https://github.com/shoucandanghehe/nonebot-plugin-tetris-stats"
|
||||
license = "MIT"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.10,<3.11"
|
||||
python = "^3.10,<3.12"
|
||||
nonebot-adapter-onebot = "^2.0.0-beta.1"
|
||||
aiohttp = "^3.8.1"
|
||||
asyncio = "^3.4.3"
|
||||
@@ -21,12 +21,12 @@ ujson = "^5.4.0"
|
||||
Brotli = "^1.0.9"
|
||||
|
||||
[tool.poetry.dev-dependencies]
|
||||
mypy = "^0.971"
|
||||
autopep8 = "^1.7.0"
|
||||
pylint = "^2.14.5"
|
||||
types-ujson = "^5.4.0"
|
||||
mypy = "^0.991"
|
||||
autopep8 = "^2.0.0"
|
||||
pylint = "^2.15.9"
|
||||
types-ujson = "^5.5.0"
|
||||
lxml-stubs = "^0.4.0"
|
||||
pandas-stubs = "^1.4.3"
|
||||
pandas-stubs = "^1.5.2"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
|
||||
Reference in New Issue
Block a user