新增查分段指令 (#121)

* 忽略 .idea 文件夹

* 新增查分段指令

* 修复未知段位可能导致程序无限死循环的异常

* 我 keys 呢

* 🎨 整理 import

* 🎨 规范引号

* 🐛 记得await

* ️ 一些优化

* 🐛 多个逗号

* 🚨 修正typing hint

* 🐛 返回值是 tuple 哦

* 🐛 少个逗号

* 🐛 你得删前缀啊

* 🐛 怎么能用 is 呢

* 🐛 记得await

* 试图匹配查询格式

* 💬 小改返回消息样式

* 🎨 修改变量名

* 🐛 修复查询大写问题

* 🐛 使用 get_db 获取数据库对象

---------

Co-authored-by: scdhh <wallfjjd@gmail.com>
This commit is contained in:
渣渣120
2023-05-30 04:57:31 +08:00
committed by GitHub
parent 972f7e90d2
commit 32d34c93d7
5 changed files with 163 additions and 11 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.idea
dist
test*
Untitled*

View File

@@ -8,7 +8,7 @@ from .processor import Processor
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
IORank = on_regex(pattern=r'^io段位|^iorank', flags=I, permission=GROUP)
@IOBind.handle()
async def _(event: MessageEvent, matcher: Matcher):
@@ -30,3 +30,11 @@ async def _(event: MessageEvent, matcher: Matcher):
qq_number=event.sender.user_id
)
)
@IORank.handle()
async def _(event: MessageEvent, matcher: Matcher):
await matcher.finish(
await Processor.handle_rank(
message=event.raw_message
)
)

View File

@@ -1,3 +1,4 @@
import math
from asyncio import gather
from typing import Any
@@ -6,7 +7,8 @@ from nonebot.log import logger
from ...utils.database import DataBase
from ...utils.message_analyzer import (
handle_bind_message,
handle_stats_query_message
handle_rank_message,
handle_stats_query_message,
)
from .request import Request
@@ -43,6 +45,82 @@ class Processor:
logger.error('预期外行为, 请上报GitHub')
return '出现预期外行为,请查看后台信息'
@classmethod
async def handle_rank(cls, message: str):
query_rank = await handle_rank_message(message)
rank_info = await DataBase.query_rank_info_today(rank=query_rank.lower())
if rank_info is None:
ranks_percentiles = {
'x': 1,
'u': 5,
'ss': 11,
's+': 17,
's': 23,
's-': 30,
'a+': 38,
'a': 46,
'a-': 54,
'b+': 62,
'b': 70,
'b-': 78,
'c+': 84,
'c': 90,
'c-': 95,
'd+': 97.5,
'd': 100,
}
if query_rank.lower() not in (i for i in ranks_percentiles.keys()):
return '未知段位'
result = await Request.request('https://ch.tetr.io/api/users/lists/league/all')
users: list = result[2]['data']['users']
def avg(rank_users: list, column: str, playercount: int | None = None) -> float:
return sum(i['league'][column] for i in rank_users) / (playercount or len(rank_users))
for rank, percentile in ranks_percentiles.items():
offset = math.floor((percentile / 100) * len(users)) - 1
tr = users[offset]['league']['rating']
rank_users = list(filter(lambda x: x['league']['rank'] == rank, users))
playercount = len(rank_users)
avg_apm = avg(rank_users, 'apm', playercount)
avg_pps = avg(rank_users, 'pps', playercount)
avg_vs = avg(rank_users, 'vs', playercount)
await DataBase.write_rank_info_today(
rank=rank,
trline=tr,
playercount=playercount,
avgapm=avg_apm,
avgpps=avg_pps,
avgvs=avg_vs
)
return await Processor.handle_rank(message=message)
else:
avg_apm = round(rank_info[3], 2)
avg_pps = round(rank_info[4], 2)
avg_vs = round(rank_info[5], 2)
avg_lpm = round((avg_pps * 24), 2)
avg_apl = round((avg_apm / avg_lpm), 2)
avg_adpm = round((avg_vs * 0.6), 2)
avg_adpl = round((avg_adpm / avg_lpm), 2)
message = f'{query_rank.upper()} 段, 分数线 {round(rank_info[1], 2)} TR, {rank_info[2]} 名玩家'
message += f'\n对比昨日趋势: {rank_info[0]}'
message += '\n'
message += f"\nL'PM: {avg_lpm} ( {avg_pps} pps )"
message += f'\nAPM: {avg_apm} ( x{avg_apl} )'
message += f'\nADPM: {avg_adpm} ( x{avg_adpl} ) ( {avg_vs}vs )'
message += '\n'
message += f'\n数据更新时间: {rank_info[6]}'
return message
@classmethod
async def handle_query(cls, message: str, qq_number: int | None):
'''处理查询消息'''
@@ -69,9 +147,9 @@ class Processor:
@classmethod
async def get_user_data(
cls,
user_name: str | None = None,
user_id: str | None = None
cls,
user_name: str | None = None,
user_id: str | None = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户数据'''
if user_name is not None and user_id is None:
@@ -84,9 +162,9 @@ class Processor:
@classmethod
async def get_solo_data(
cls,
user_name: str | None = None,
user_id: str | None = None
cls,
user_name: str | None = None,
user_id: str | None = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取Solo数据'''
if user_name is not None and user_id is None:
@@ -165,9 +243,9 @@ class Processor:
@classmethod
async def generate_message(
cls,
user_name: str | None = None,
user_id: str | None = None
cls,
user_name: str | None = None,
user_id: str | None = None
) -> str:
'''生成消息'''
user_data, solo_data = await gather(

View File

@@ -1,3 +1,4 @@
import datetime
import os
from asyncio import gather
from sqlite3 import Connection, connect
@@ -39,10 +40,61 @@ class DataBase():
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS IORANK
(RANK VARCHAR(2) NOT NULL,
TRENDING CHAR(1) NOT NULL,
TRLINE FLOAT NOT NULL,
PLAYERCOUNT INTEGER NOT NULL,
AVGAPM FLOAT NOT NULL,
AVGPPS FLOAT NOT NULL,
ARGVS FLOAT NOT NULL,
DATE TEXT NOT NULL)''')
cls._db.commit()
logger.info('数据库初始化完成')
return cls._db
@classmethod
async def query_rank_info_today(cls, rank: str) -> list | None:
'''查询段位信息'''
db = await cls._get_db()
cursor = db.cursor()
cursor.execute('''SELECT TRENDING, TRLINE, PLAYERCOUNT, AVGAPM, AVGPPS, ARGVS, DATE
FROM IORANK
WHERE RANK = ? AND DATE = ?''', (rank, datetime.date.today()))
result = cursor.fetchone()
if result is None:
return None
return list(result)
@classmethod
async def write_rank_info_today(cls, rank: str, trline: int, playercount: int, avgapm: float, avgpps: float, avgvs: float):
'''写入段位信息'''
db = await cls._get_db()
cursor = db.cursor()
cursor.execute('''SELECT TRLINE
FROM IORANK
WHERE RANK = ? AND DATE = ?''', (rank, datetime.date.today()))
result = cursor.fetchone()
if result is None:
trending = '?'
else:
if result[0] > trline:
trending = ''
else:
trending = ''
cursor.execute('''INSERT INTO IORANK
(RANK, TRENDING, TRLINE, PLAYERCOUNT, AVGAPM, AVGPPS, ARGVS, DATE)
VALUES (?,?,?,?,?,?,?,?)''',
(rank, trending, trline, playercount, avgapm, avgpps, avgvs, datetime.date.today()))
db.commit()
@classmethod
async def _get_db(cls) -> Connection:
'''获取数据库对象'''

View File

@@ -53,6 +53,19 @@ async def handle_stats_query_message(message: str, game_type: str) -> tuple[str
return await check_name(message, game_type)
async def handle_rank_message(message: str) -> str:
_cmd_aliases = ['io段位', 'iorank']
# 剔除命令前缀
for i in _cmd_aliases:
if match(rf'(?i){i}', message):
message = sub(rf'(?i){i}', '', message)
message = message.strip()
break
else:
raise ValueError('预期外行为, 请上报GitHub')
return message
async def check_name(name: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]'''
if game_type == 'IO':