整理代码

This commit is contained in:
2022-08-09 02:50:06 +08:00
parent b320a23c77
commit d3258c2bb7
4 changed files with 67 additions and 44 deletions

View File

@@ -1,3 +1,4 @@
from asyncio import gather
from re import I from re import I
from typing import Any from typing import Any
@@ -35,7 +36,13 @@ async def _(event: MessageEvent, matcher: Matcher):
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是 if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是
logger.error('获取QQ号失败') logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败') await matcher.finish('获取QQ号失败')
await matcher.finish(await DataBase.write_bind_info(qq_number=event.sender.user_id, user=user_name, game_type='TOP')) await matcher.finish(
await DataBase.write_bind_info(
qq_number=event.sender.user_id,
user=user_name,
game_type='TOP'
)
)
@TopStats.handle() @TopStats.handle()
@@ -45,7 +52,7 @@ async def _(event: MessageEvent, matcher: Matcher):
await matcher.finish(decoded_message[1][0]) await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT': elif decoded_message[0] == 'AT':
if event.is_tome() is True: if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息') await matcher.finish('不能查询bot的信息')
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='TOP') bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
if bind_info is None: if bind_info is None:
message = '未查询到绑定信息' message = '未查询到绑定信息'
@@ -62,7 +69,7 @@ async def _(event: MessageEvent, matcher: Matcher):
message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}') message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(bind_info)}')
elif decoded_message[0] == 'Name': elif decoded_message[0] == 'Name':
message = await generate_message(decoded_message[1][1]) message = await generate_message(decoded_message[1][1])
await matcher.finish(message=message) await matcher.finish(message)
async def get_user_data(user_name: str) -> tuple[bool, str]: async def get_user_data(user_name: str) -> tuple[bool, str]:
@@ -71,10 +78,10 @@ async def get_user_data(user_name: str) -> tuple[bool, str]:
try: try:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url) as resp: async with session.get(url) as resp:
return (True, await resp.text()) return True, await resp.text()
except aiohttp.client_exceptions.ClientConnectorError as error: except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(error) logger.error(error)
return (False, '') return False, ''
async def check_user(user_data: str) -> bool: async def check_user(user_data: str) -> bool:
@@ -156,8 +163,10 @@ async def generate_message(user_name: str) -> str:
return '用户信息请求失败' return '用户信息请求失败'
if await check_user(user_data[1]) is False: if await check_user(user_data[1]) is False:
return '用户不存在' return '用户不存在'
user_name = await get_user_name(user_data[1]) user_name, game_stats = await gather(
game_stats = await get_game_stats(user_data[1]) get_user_name(user_data[1]),
get_game_stats(user_data[1])
)
message = '' message = ''
if game_stats['24H'] and game_stats['All']: if game_stats['24H'] and game_stats['All']:
message += f'用户 {user_name} 24小时内统计数据为: ' message += f'用户 {user_name} 24小时内统计数据为: '

View File

@@ -10,8 +10,11 @@ from nonebot.matcher import Matcher
from ..utils.message_analyzer import handle_stats_query_message from ..utils.message_analyzer import handle_stats_query_message
TOSStats = on_regex(pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats', TOSStats = on_regex(
flags=I, permission=GROUP) pattern=r'^tos查|^tostats|^tosstats|^茶服查|^茶服stats',
flags=I,
permission=GROUP
)
@TOSStats.handle() @TOSStats.handle()
@@ -21,13 +24,13 @@ async def _(event: MessageEvent, matcher: Matcher):
await matcher.finish(decoded_message[1][0]) await matcher.finish(decoded_message[1][0])
elif decoded_message[0] == 'AT' or decoded_message[0] == 'QQ': elif decoded_message[0] == 'AT' or decoded_message[0] == 'QQ':
if decoded_message[1][1] == event.self_id: if decoded_message[1][1] == event.self_id:
await matcher.finish(message='不能查询bot的信息') await matcher.finish('不能查询bot的信息')
message = await generate_message(tea_id=decoded_message[1][1]) message = await generate_message(tea_id=decoded_message[1][1])
elif decoded_message[0] == 'ME': elif decoded_message[0] == 'ME':
message = await generate_message(tea_id=event.sender.user_id) message = await generate_message(tea_id=event.sender.user_id)
elif decoded_message[0] == 'Name': elif decoded_message[0] == 'Name':
message = await generate_message(user_name=decoded_message[1][1]) message = await generate_message(user_name=decoded_message[1][1])
await matcher.finish(message=message) await matcher.finish(message)
async def request(url: str) -> tuple[bool, bool, dict[str, Any]]: async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
@@ -36,15 +39,16 @@ async def request(url: str) -> tuple[bool, bool, dict[str, Any]]:
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
async with session.get(url) as resp: async with session.get(url) as resp:
data = await resp.json() data = await resp.json()
return (True, data['success'], data) return True, data['success'], data
except aiohttp.client_exceptions.ClientConnectorError as error: except aiohttp.client_exceptions.ClientConnectorError as error:
logger.error(f'请求错误\n{error}') logger.error(f'请求错误\n{error}')
return (False, False, {}) return False, False, {}
async def get_user_info(user_name: str = None, async def get_user_info(
tea_id: int = None user_name: str = None,
) -> tuple[bool, bool, dict[str, Any]]: tea_id: int = None
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户信息''' '''获取用户信息'''
if user_name is not None and tea_id is None: if user_name is not None and tea_id is None:
user_data_url = f'https://teatube.cn:8888/getUsernameInfo?username={user_name}' user_data_url = f'https://teatube.cn:8888/getUsernameInfo?username={user_name}'
@@ -52,13 +56,14 @@ async def get_user_info(user_name: str = None,
user_data_url = f'https://teatube.cn:8888/getTeaIdInfo?teaId={tea_id}' user_data_url = f'https://teatube.cn:8888/getTeaIdInfo?teaId={tea_id}'
else: else:
raise ValueError('预期外行为, 请上报GitHub') raise ValueError('预期外行为, 请上报GitHub')
return await request(url=user_data_url) return await request(user_data_url)
async def get_user_data(user_name: str = None, async def get_user_data(
tea_id: int = None, user_name: str = None,
other_parameter: str = '' tea_id: int = None,
) -> tuple[bool, bool, dict[str, Any]]: other_parameter: str = ''
) -> tuple[bool, bool, dict[str, Any]]:
'''获取用户数据''' '''获取用户数据'''
if user_name is not None and tea_id is None: if user_name is not None and tea_id is None:
user_data_url = f'https://teatube.cn:8888/getProfile?id={user_name}{other_parameter}' user_data_url = f'https://teatube.cn:8888/getProfile?id={user_name}{other_parameter}'
@@ -66,7 +71,7 @@ async def get_user_data(user_name: str = None,
user_data_url = f'https://teatube.cn:8888/getProfile?id={tea_id}{other_parameter}' user_data_url = f'https://teatube.cn:8888/getProfile?id={tea_id}{other_parameter}'
else: else:
raise ValueError('预期外行为, 请上报GitHub') raise ValueError('预期外行为, 请上报GitHub')
return await request(url=user_data_url) return await request(user_data_url)
async def get_rank_stats(user_info: dict) -> dict[str, float]: async def get_rank_stats(user_info: dict) -> dict[str, float]:
@@ -133,13 +138,18 @@ async def get_pb_data(user_info: dict) -> dict[str, float | str]:
async def generate_message(user_name: str = None, tea_id: int = None) -> str: async def generate_message(user_name: str = None, tea_id: int = None) -> str:
'''生成消息''' '''生成消息'''
user_info, user_data = await gather(get_user_info(user_name=user_name, tea_id=tea_id), user_info, user_data = await gather(
get_user_data(user_name=user_name, tea_id=tea_id)) get_user_info(user_name=user_name, tea_id=tea_id),
get_user_data(user_name=user_name, tea_id=tea_id)
)
if user_info[0] is False: if user_info[0] is False:
return '用户信息请求失败' return '用户信息请求失败'
if user_info[1] is False: if user_info[1] is False:
return f'用户信息请求错误:\n{user_info[2]["error"]}' return f'用户信息请求错误:\n{user_info[2]["error"]}'
rank_stats, pb_data = await gather(get_rank_stats(user_info[2]), get_pb_data(user_info[2])) rank_stats, pb_data = await gather(
get_rank_stats(user_info[2]),
get_pb_data(user_info[2])
)
message = f'用户 {user_info[2]["data"]["name"]} ({user_info[2]["data"]["teaId"]}) ' message = f'用户 {user_info[2]["data"]["name"]} ({user_info[2]["data"]["teaId"]}) '
if not rank_stats: if not rank_stats:
message += '暂无段位统计数据' message += '暂无段位统计数据'

View File

@@ -2,5 +2,6 @@ from pydantic import BaseModel
class Config(BaseModel): class Config(BaseModel):
'''配置类'''
cache_path: str = 'cache/nonebot_plugin_tetris_stats/cache' cache_path: str = 'cache/nonebot_plugin_tetris_stats/cache'
db_path: str = 'data/nonebot_plugin_tetris_stats/data.db' db_path: str = 'data/nonebot_plugin_tetris_stats/data.db'

View File

@@ -14,7 +14,7 @@ async def handle_bind_message(message: str, game_type: str) -> tuple[str | None,
else: else:
raise ValueError('预期外行为, 请上报GitHub') raise ValueError('预期外行为, 请上报GitHub')
if message == '' or message.isspace(): if message == '' or message.isspace():
return (None, ('用户名为空', None)) return None, ('用户名为空', None)
return await check_name(message, game_type) return await check_name(message, game_type)
@@ -23,11 +23,13 @@ async def handle_stats_query_message(message: str, game_type: str) -> tuple[str
_cmd_aliases = {'IO': ['io查', 'iostats'], _cmd_aliases = {'IO': ['io查', 'iostats'],
'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'], 'TOS': ['tos查', 'tostats', 'tosstats', '茶服查', '茶服stats'],
'TOP': ['top查', 'topstats']} 'TOP': ['top查', 'topstats']}
_me = ['', '自己', '我等', '卑人', '', '老身', '', '老娘', '本姑娘', '本大爷', '鄙人', '寡人', _me = [
'小生', '贫僧', '本人', '', '', '', '', '', 'me', '洒家', '在下', '', '', '', '自己', '我等', '卑人', '', '老身', '', '老娘', '本姑娘', '本大爷', '鄙人', '',
'本小姐', '老夫', '老子', '', '本尊', '', '拙者', '', '', '自分', '吾輩', '我輩', '', '小生', '贫僧', '本人', '', '', '', '', '', 'me', '洒家', '在下', '', '人家',
'己等', '俺等', '此方', '', '', '劳资', '本宝宝', '', '本喵', 'watashi', 'i', 'myself', '本小姐', '老夫', '老子', '', '本尊', '', '拙者', '', '', '自分', '吾輩', '我輩', '',
'self', 'oneself'] '己等', '俺等', '此方', '', '', '劳资', '本宝宝', '', '本喵', 'watashi', 'i', 'myself',
'self', 'oneself'
]
# 剔除命令前缀 # 剔除命令前缀
for i in _cmd_aliases[game_type]: for i in _cmd_aliases[game_type]:
if match(rf'(?i){i}', message): if match(rf'(?i){i}', message):
@@ -35,18 +37,18 @@ async def handle_stats_query_message(message: str, game_type: str) -> tuple[str
message = message.strip() message = message.strip()
break break
if message == '' or message.isspace(): if message == '' or message.isspace():
return (None, ('用户名为空', None)) return None, ('用户名为空', None)
if message.startswith('[CQ:at,qq='): if message.startswith('[CQ:at,qq='):
try: try:
user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0]) user = int(str(message).split('[CQ:at,qq=')[1].split(']')[0])
except ValueError: except ValueError:
return (None, ('QQ号码不合法', None)) return None, ('QQ号码不合法', None)
else: else:
return ('AT', (None, user)) return 'AT', (None, user)
elif message in _me: elif message in _me:
# 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢 # 会不会有人叫本姑娘 本大爷这种或许可以成为id的名字呢
# TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName # TODO: 在判断是否可能是查自己的情况的时候 也去判断是否能成立为一个UserName
return ('ME', (None, None)) return 'ME', (None, None)
else: else:
return await check_name(message, game_type) return await check_name(message, game_type)
@@ -55,21 +57,22 @@ async def check_name(name: str, game_type: str) -> tuple[str | None, tuple]:
'''返回值为tuple[gameType, tuple[message, user]]''' '''返回值为tuple[gameType, tuple[message, user]]'''
if game_type == 'IO': if game_type == 'IO':
if match(r'^[a-f0-9]{24}$', name): if match(r'^[a-f0-9]{24}$', name):
return ('ID', (None, name)) return 'ID', (None, name)
if match(r'^[a-zA-Z0-9_-]{3,16}$', name): if match(r'^[a-zA-Z0-9_-]{3,16}$', name):
return ('Name', (None, name.lower())) return 'Name', (None, name.lower())
return (None, ('用户名不合法', None)) return None, ('用户名不合法', None)
if game_type == 'TOP': if game_type == 'TOP':
if match(r'^[a-zA-Z0-9_]{1,16}$', name): if match(r'^[a-zA-Z0-9_]{1,16}$', name):
return ('Name', (None, name)) return 'Name', (None, name)
return (None, ('用户名不合法', None)) return None, ('用户名不合法', None)
if game_type == 'TOS': if game_type == 'TOS':
if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name) if (match(r'^(?!\.)(?!com[0-9]$)(?!con$)(?!lpt[0-9]$)(?!nul$)(?!prn$)[^\-][^\+][^\|\*\?\\\s\!:<>/$"]*[^\.\|\*\?\\\s\!:<>/$"]+$', name)
and name.isdigit() is False and name.isdigit() is False
and 2 <= len(name) <= 18): and 2 <= len(name) <= 18):
# 虽然我也不想这么长 但是似乎确实得这么长 # 虽然我也不想这么长 但是似乎确实得这么长
return ('Name', (None, name)) # TODO 简化正则表达式
return 'Name', (None, name)
if name.isdigit() is True: if name.isdigit() is True:
return ('QQ', (None, name)) return 'QQ', (None, name)
return (None, ('用户名不合法', None)) return None, ('用户名不合法', None)
return (None, ('游戏类型错误', None)) return None, ('游戏类型错误', None)