diff --git a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor.py b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor.py deleted file mode 100644 index ffb6b10..0000000 --- a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor.py +++ /dev/null @@ -1,360 +0,0 @@ -import os -from asyncio import gather -from re import I -from typing import Any - -import aiohttp -from nonebot import get_driver, on_regex -from nonebot.adapters.onebot.v11 import GROUP, MessageEvent -from nonebot.log import logger -from nonebot.matcher import Matcher -from playwright.async_api import Browser, Response, async_playwright -from ujson import JSONDecodeError, dumps, loads - -from ..utils.config import Config -from ..utils.database import DataBase -from ..utils.message_analyzer import ( - handle_bind_message, - handle_stats_query_message -) - -IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP) -IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP) - -driver = get_driver() - -config = Config.parse_obj(get_driver().config) - - -@IOBind.handle() -async def _(event: MessageEvent, matcher: Matcher): - decoded_message = await handle_bind_message(message=event.raw_message, game_type='IO') - if decoded_message[0] is None: - await matcher.finish(decoded_message[1][0]) - if decoded_message[0] == 'ID': - user_id_stats = await check_user_id(decoded_message[1][1]) - if user_id_stats[0] is False: - await matcher.finish(user_id_stats[1]) - else: - user_id = decoded_message[1][1] - elif decoded_message[0] == 'Name': - user_data = await get_user_data(user_name=decoded_message[1][1]) - if user_data[0] is False: - await matcher.finish('用户信息请求失败') - elif user_data[1] is False: - await matcher.finish(f'用户信息请求错误:\n{user_data[2]["error"]}') - else: - user_id = await get_user_id(user_data[2]) - if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是( - logger.error('获取QQ号失败') - await matcher.finish('获取QQ号失败') - await matcher.finish( - await DataBase.write_bind_info( - qq_number=event.sender.user_id, - user=user_id, - game_type='IO' - ) - ) - - -@IOStats.handle() -async def _(event: MessageEvent, matcher: Matcher): - decoded_message = await handle_stats_query_message(message=event.raw_message, game_type='IO') - if decoded_message[0] is None: - await matcher.finish(decoded_message[1][0]) - elif decoded_message[0] == 'AT': - if event.is_tome() is True: - await matcher.finish('不能查询bot的信息') - bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO') - if bind_info is None: - message = '未查询到绑定信息' - else: - message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}') - elif decoded_message[0] == 'ME': - if event.sender.user_id is None: - logger.error('获取QQ号失败') - await matcher.finish('获取QQ号失败, 请联系bot主人') - bind_info = await DataBase.query_bind_info(qq_number=event.sender.user_id, game_type='IO') - if bind_info is None: - message = '未查询到绑定信息' - else: - message = (f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await generate_message(user_id=bind_info)}') - elif decoded_message[0] == 'ID': - message = await generate_message(user_id=decoded_message[1][1]) - elif decoded_message[0] == 'Name': - message = await generate_message(user_name=decoded_message[1][1]) - await matcher.finish(message) - - -@driver.on_startup -async def _(): - await Request.init_cache() - await Request.read_cache() - - -@driver.on_shutdown -async def _(): - await Request.close_browser() - await Request.write_cache() - - -async def get_user_data( - user_name: str = None, - user_id: str = None -) -> tuple[bool, bool, dict[str, Any]]: - '''获取用户数据''' - if user_name is not None and user_id is None: - user_data_url = f'https://ch.tetr.io/api/users/{user_name}' - elif user_name is None and user_id is not None: - user_data_url = f'https://ch.tetr.io/api/users/{user_id}' - else: - raise ValueError('预期外行为, 请上报GitHub') - return await Request.request(user_data_url) - - -async def get_solo_data( - user_name: str = None, - user_id: str = None -) -> tuple[bool, bool, dict[str, Any]]: - '''获取Solo数据''' - if user_name is not None and user_id is None: - user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records' - elif user_name is None and user_id is not None: - user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records' - else: - raise ValueError('预期外行为, 请上报GitHub') - return await Request.request(user_solo_url) - - -async def get_user_id(user_data: dict) -> str: - '''获取用户ID''' - return user_data['data']['user']['_id'] - - -async def check_user_id(user_id: str) -> tuple[bool, str]: - '''检查用户ID是否有效''' - user_data = await get_user_data(user_id=user_id) - if user_data[0] is False: - return False, '用户信息请求失败' - if user_data[1] is False: - return False, f'用户信息请求错误:\n{user_data[2]["error"]}' - if user_id == user_data[2]['data']['user']['_id']: - return True, '' - raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x') - - -async def get_league_stats(user_data: dict) -> dict[str, Any]: - '''获取排位统计数据''' - league = user_data['data']['user']['league'] - league_stats: dict[str, Any] = {} - if league['gamesplayed'] != 0: - league_stats['PPS'] = league['pps'] - league_stats['APM'] = league['apm'] - league_stats['VS'] = 0 if league['vs'] is None else league['vs'] - league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper() - if league['rating'] == -1: - league_stats['Rank'] = None - else: - league_stats['Rating'] = round(league['rating'], 2) - league_stats['Glicko'] = round(league['glicko'], 2) - league_stats['RD'] = round(league['rd'], 2) - league_stats['Standing'] = league['standing'] - league_stats['LPM'] = round((league['pps'] * 24), 2) - league_stats['APL'] = round( - (league_stats['APM'] / league_stats['LPM']), 2) - league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2) - league_stats['ADPL'] = round( - (league_stats['ADPM'] / league_stats['LPM']), 2) - return league_stats - - -async def get_sprint_stats(solo_data: dict) -> dict[str, Any]: - '''获取40L统计数据''' - sprint_stats = {} - solo = solo_data['data']['records']['40l'] - if solo['record'] is not None: - sprint_stats['Time'] = round( - solo['record']['endcontext']['finalTime'] / 1000, 2) - if solo['rank'] is not None: - sprint_stats['Rank'] = solo['rank'] - return sprint_stats - - -async def get_blitz_stats(solo_data: dict) -> dict[str, Any]: - '''获取Blitz统计数据''' - blitz_stats = {} - blitz = solo_data['data']['records']['blitz'] - if blitz['record'] is not None: - blitz_stats['Score'] = blitz['record']['endcontext']['score'] - if blitz['rank'] is not None: - blitz_stats['Rank'] = blitz['rank'] - return blitz_stats - - -async def generate_message(user_name: str = None, user_id: str = None) -> str: - '''生成消息''' - user_data, solo_data = await gather( - get_user_data(user_name=user_name, user_id=user_id), - get_solo_data(user_name=user_name, user_id=user_id) - ) - if user_data[0] is False: - return '用户信息请求失败' - if user_data[1] is False: - return f'用户信息请求错误:\n{user_data[2]["error"]}' - user_name = user_data[2]['data']['user']['username'].upper() - league_stats = await get_league_stats(user_data[2]) - message = '' - if not league_stats: - message += f'用户 {user_name} 没有排位统计数据' - else: - if league_stats['Rank'] is None: - message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:' - else: - if league_stats['Rank'] == 'Z': - message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR' - else: - message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})' - message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:' - message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )' - message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )' - if league_stats["VS"] != 0: - message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )' - if solo_data[0] is False: - return f'{message}\nSolo统计数据请求失败' - if solo_data[1] is False: - return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}' - sprint_stats, blitz_stats = await gather( - get_sprint_stats(solo_data[2]), - get_blitz_stats(solo_data[2]) - ) - message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else '' - message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else '' - message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else '' - message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else '' - return message - - -class Request: - '''网络请求相关类''' - _browser: Browser | None = None - _headers: dict | None = None - _cookies: dict | None = None - - @classmethod - async def _init_playwright(cls) -> Browser: - '''初始化playwright''' - playwright = await async_playwright().start() - cls._browser = await playwright.firefox.launch() - return cls._browser - - @classmethod - async def _get_browser(cls) -> Browser: - '''获取浏览器对象''' - return cls._browser or await cls._init_playwright() - - @classmethod - async def _anti_cloudflare(cls, url: str) -> tuple[bool, bool, dict[str, Any]]: - '''用firefox硬穿五秒盾''' - browser = await cls._get_browser() - context = await browser.new_context() - page = await context.new_page() - response = await page.goto(url) - attempts = 0 - while attempts < 60: - attempts += 1 - text = await page.locator("body").text_content() - if text is None: - await page.wait_for_timeout(1000) - continue - if await page.title() == 'Please Wait... | Cloudflare': - # TODO 有无人来做一个过验证码( - break - try: - data = loads(text) - except JSONDecodeError: - await page.wait_for_timeout(1000) - else: - assert isinstance(response, Response) - cls._headers = await response.request.all_headers() - cls._cookies = {i['name']: i['value'] for i in await context.cookies()} - await page.close() - await context.close() - return True, data['success'], data - await page.close() - await context.close() - return True, False, {'error': '绕过五秒盾失败'} - - @classmethod - async def init_cache(cls) -> None: - '''初始化缓存文件''' - if not os.path.exists(os.path.dirname(config.cache_path)): - os.makedirs(os.path.dirname(config.cache_path)) - if not os.path.exists(config.cache_path): - with open(file=config.cache_path, mode='w', encoding='UTF-8') as file: - file.write( - dumps( - { - 'headers': cls._headers, - 'cookies': cls._cookies - } - ) - ) - - @classmethod - async def read_cache(cls) -> None: - '''读取缓存文件''' - try: - with open(file=config.cache_path, mode='r', encoding='UTF-8') as file: - json = loads(file.read()) - cls._headers = json['headers'] - cls._cookies = json['cookies'] - except FileNotFoundError: - await cls.init_cache() - except PermissionError: - os.remove(config.cache_path) - await cls.init_cache() - except JSONDecodeError: - os.remove(config.cache_path) - await cls.init_cache() - - @classmethod - async def write_cache(cls) -> None: - '''写入缓存文件''' - try: - with open(file=config.cache_path, mode='r+', encoding='UTF-8') as file: - file.write( - dumps( - { - 'headers': cls._headers, - 'cookies': cls._cookies - } - ) - ) - except FileNotFoundError: - await cls.init_cache() - except PermissionError: - os.remove(config.cache_path) - await cls.init_cache() - except JSONDecodeError: - os.remove(config.cache_path) - await cls.init_cache() - - @classmethod - async def request(cls, url: str) -> tuple[bool, bool, dict[str, Any]]: - '''请求api''' - try: - async with aiohttp.ClientSession(cookies=cls._cookies) as session: - async with session.get(url, headers=cls._headers) as resp: - data = await resp.json() - return True, data['success'], data - except aiohttp.client_exceptions.ClientConnectorError as error: - logger.error(f'请求错误\n{error}') - return False, False, {} - except aiohttp.client_exceptions.ContentTypeError: - return await cls._anti_cloudflare(url) - - @classmethod - async def close_browser(cls) -> None: - '''关闭浏览器对象''' - if isinstance(cls._browser, Browser): - await cls._browser.close() diff --git a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/__init__.py b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/__init__.py new file mode 100644 index 0000000..1fabd68 --- /dev/null +++ b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/__init__.py @@ -0,0 +1,30 @@ +from re import I + +from nonebot import on_regex +from nonebot.adapters.onebot.v11 import GROUP, MessageEvent +from nonebot.matcher import Matcher + +from .processor import Processor + +IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP) +IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP) + + +@IOBind.handle() +async def _(event: MessageEvent, matcher: Matcher): + await matcher.finish( + await Processor.handle_bind( + message=event.raw_message, + qq_number=event.sender.user_id + ) + ) + + +@IOStats.handle() +async def _(event: MessageEvent, matcher: Matcher): + await matcher.finish( + await Processor.handle_query( + message=event.raw_message, + qq_number=event.sender.user_id + ) + ) diff --git a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py new file mode 100644 index 0000000..96963ae --- /dev/null +++ b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/processor.py @@ -0,0 +1,205 @@ +from asyncio import gather +from typing import Any + +from nonebot.log import logger + +from ...utils.database import DataBase +from ...utils.message_analyzer import ( + handle_bind_message, + handle_stats_query_message +) +from .request import Request + + +class Processor: + @classmethod + async def handle_bind(cls, message: str, qq_number: int | None) -> str: + '''处理绑定消息''' + decoded_message = await handle_bind_message(message=message, game_type='IO') + if decoded_message[0] is None: + return decoded_message[1][0] + if decoded_message[0] == 'ID': + user_id_stats = await cls.check_user_id(decoded_message[1][1]) + if user_id_stats[0] is False: + return user_id_stats[1] + user_id = decoded_message[1][1] + elif decoded_message[0] == 'Name': + user_data = await cls.get_user_data(user_name=decoded_message[1][1]) + if user_data[0] is False: + return '用户信息请求失败' + if user_data[1] is False: + return f'用户信息请求错误:\n{user_data[2]["error"]}' + user_id = await cls.get_user_id(user_data[2]) + if qq_number is None: # 理论上是不会有None出现的, ide快乐行属于是( + logger.error('获取QQ号失败') + return '获取QQ号失败' + return ( + await DataBase.write_bind_info( + qq_number=qq_number, + user=user_id, + game_type='IO' + ) + ) + + @classmethod + async def handle_query(cls, message: str, qq_number: int | None): + '''处理查询消息''' + decoded_message = await handle_stats_query_message(message=message, game_type='IO') + if decoded_message[0] is None: + return decoded_message[1][0] + if decoded_message[0] == 'AT': # 在入口处判断是否@bot本身 + bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO') + if bind_info is None: + return '未查询到绑定信息' + return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}' + if decoded_message[0] == 'ME': + if qq_number is None: + logger.error('获取QQ号失败') + return '获取QQ号失败, 请联系bot主人' + bind_info = await DataBase.query_bind_info(qq_number=qq_number, game_type='IO') + if bind_info is None: + return '未查询到绑定信息' + return f'* 由于无法验证绑定信息, 不能保证查询到的用户为本人\n{await Processor.generate_message(user_id=bind_info)}' + if decoded_message[0] == 'ID': + return await Processor.generate_message(user_id=decoded_message[1][1]) + if decoded_message[0] == 'Name': + return await Processor.generate_message(user_name=decoded_message[1][1]) + + @classmethod + async def get_user_data( + cls, + user_name: str = None, + user_id: str = None + ) -> tuple[bool, bool, dict[str, Any]]: + '''获取用户数据''' + if user_name is not None and user_id is None: + user_data_url = f'https://ch.tetr.io/api/users/{user_name}' + elif user_name is None and user_id is not None: + user_data_url = f'https://ch.tetr.io/api/users/{user_id}' + else: + raise ValueError('预期外行为, 请上报GitHub') + return await Request.request(user_data_url) + + @classmethod + async def get_solo_data( + cls, + user_name: str = None, + user_id: str = None + ) -> tuple[bool, bool, dict[str, Any]]: + '''获取Solo数据''' + if user_name is not None and user_id is None: + user_solo_url = f'https://ch.tetr.io/api/users/{user_name}/records' + elif user_name is None and user_id is not None: + user_solo_url = f'https://ch.tetr.io/api/users/{user_id}/records' + else: + raise ValueError('预期外行为, 请上报GitHub') + return await Request.request(user_solo_url) + + @classmethod + async def get_user_id(cls, user_data: dict) -> str: + '''获取用户ID''' + return user_data['data']['user']['_id'] + + @classmethod + async def check_user_id(cls, user_id: str) -> tuple[bool, str]: + '''检查用户ID是否有效 返回值为tuple[bool, message]''' + user_data = await cls.get_user_data(user_id=user_id) + if user_data[0] is False: + return False, '用户信息请求失败' + if user_data[1] is False: + return False, f'用户信息请求错误:\n{user_data[2]["error"]}' + if user_id == user_data[2]['data']['user']['_id']: + return True, '' + raise ValueError('服务器返回的userID和用户提供的不一致, 这种情况理论上不应该发生, 以防万一还是写一下(x') + + @classmethod + async def get_league_stats(cls, user_data: dict) -> dict[str, Any]: + '''获取排位统计数据''' + league = user_data['data']['user']['league'] + league_stats: dict[str, Any] = {} + if league['gamesplayed'] != 0: + league_stats['PPS'] = league['pps'] + league_stats['APM'] = league['apm'] + league_stats['VS'] = 0 if league['vs'] is None else league['vs'] + league_stats['Rank'] = 'Z' if league['rank'] == 'z' else league['rank'].upper( + ) + if league['rating'] == -1: + league_stats['Rank'] = None + else: + league_stats['Rating'] = round(league['rating'], 2) + league_stats['Glicko'] = round(league['glicko'], 2) + league_stats['RD'] = round(league['rd'], 2) + league_stats['Standing'] = league['standing'] + league_stats['LPM'] = round((league['pps'] * 24), 2) + league_stats['APL'] = round( + (league_stats['APM'] / league_stats['LPM']), 2) + league_stats['ADPM'] = round((league_stats['VS'] * 0.6), 2) + league_stats['ADPL'] = round( + (league_stats['ADPM'] / league_stats['LPM']), 2) + return league_stats + + @classmethod + async def get_sprint_stats(cls, solo_data: dict) -> dict[str, Any]: + '''获取40L统计数据''' + sprint_stats = {} + solo = solo_data['data']['records']['40l'] + if solo['record'] is not None: + sprint_stats['Time'] = round( + solo['record']['endcontext']['finalTime'] / 1000, 2) + if solo['rank'] is not None: + sprint_stats['Rank'] = solo['rank'] + return sprint_stats + + @classmethod + async def get_blitz_stats(cls, solo_data: dict) -> dict[str, Any]: + '''获取Blitz统计数据''' + blitz_stats = {} + blitz = solo_data['data']['records']['blitz'] + if blitz['record'] is not None: + blitz_stats['Score'] = blitz['record']['endcontext']['score'] + if blitz['rank'] is not None: + blitz_stats['Rank'] = blitz['rank'] + return blitz_stats + + @classmethod + async def generate_message(cls, user_name: str = None, user_id: str = None) -> str: + '''生成消息''' + user_data, solo_data = await gather( + cls.get_user_data(user_name=user_name, user_id=user_id), + cls.get_solo_data(user_name=user_name, user_id=user_id) + ) + if user_data[0] is False: + return '用户信息请求失败' + if user_data[1] is False: + return f'用户信息请求错误:\n{user_data[2]["error"]}' + user_name = user_data[2]['data']['user']['username'].upper() + league_stats = await cls.get_league_stats(user_data[2]) + message = '' + if not league_stats: + message += f'用户 {user_name} 没有排位统计数据' + else: + if league_stats['Rank'] is None: + message += f'用户 {user_name} 暂未完成定级赛, 最近十场的数据:' + else: + if league_stats['Rank'] == 'Z': + message += f'用户 {user_name} 暂无段位, {league_stats["Rating"]} TR' + else: + message += f'{league_stats["Rank"]} 段用户 {user_name} {league_stats["Rating"]} TR (#{league_stats["Standing"]})' + message += f', 段位分 {league_stats["Glicko"]}±{league_stats["RD"]}, 最近十场的数据:' + message += f'\nL\'PM: {league_stats["LPM"]} ( {league_stats["PPS"]} pps )' + message += f'\nAPM: {league_stats["APM"]} ( x{league_stats["APL"]} )' + if league_stats["VS"] != 0: + message += f'\nADPM: {league_stats["ADPM"]} ( x{league_stats["ADPL"]} ) ( {league_stats["VS"]}vs )' + if solo_data[0] is False: + return f'{message}\nSolo统计数据请求失败' + if solo_data[1] is False: + return f'{message}\nSolo统计数据请求错误:\n{solo_data[2]["error"]}' + sprint_stats, blitz_stats = await gather( + cls.get_sprint_stats(solo_data[2]), + cls.get_blitz_stats(solo_data[2]) + ) + message += f'\n40L: {sprint_stats["Time"]}s' if 'Time' in sprint_stats else '' + message += f' ( #{sprint_stats["Rank"]} )' if 'Rank' in sprint_stats else '' + message += f'\nBlitz: {blitz_stats["Score"]}' if 'Score' in blitz_stats else '' + message += f' ( #{blitz_stats["Rank"]} )' if 'Rank' in blitz_stats else '' + return message diff --git a/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/request.py b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/request.py new file mode 100644 index 0000000..ba4f684 --- /dev/null +++ b/nonebot_plugin_tetris_stats/game_data_processor/io_data_processor/request.py @@ -0,0 +1,152 @@ +import os +from typing import Any + +import aiohttp +from nonebot import get_driver +from nonebot.log import logger +from playwright.async_api import Browser, Response, async_playwright +from ujson import JSONDecodeError, dumps, loads + +from ...utils.config import Config + +driver = get_driver() + +config = Config.parse_obj(get_driver().config) + + +@driver.on_startup +async def _(): + await Request.init_cache() + await Request.read_cache() + + +@driver.on_shutdown +async def _(): + await Request.close_browser() + await Request.write_cache() + + +class Request: + '''网络请求相关类''' + _browser: Browser | None = None + _headers: dict | None = None + _cookies: dict | None = None + + @classmethod + async def _init_playwright(cls) -> Browser: + '''初始化playwright''' + playwright = await async_playwright().start() + cls._browser = await playwright.firefox.launch() + return cls._browser + + @classmethod + async def _get_browser(cls) -> Browser: + '''获取浏览器对象''' + return cls._browser or await cls._init_playwright() + + @classmethod + async def _anti_cloudflare(cls, url: str) -> tuple[bool, bool, dict[str, Any]]: + '''用firefox硬穿五秒盾''' + browser = await cls._get_browser() + context = await browser.new_context() + page = await context.new_page() + response = await page.goto(url) + attempts = 0 + while attempts < 60: + attempts += 1 + text = await page.locator("body").text_content() + if text is None: + await page.wait_for_timeout(1000) + continue + if await page.title() == 'Please Wait... | Cloudflare': + # TODO 有无人来做一个过验证码( + break + try: + data = loads(text) + except JSONDecodeError: + await page.wait_for_timeout(1000) + else: + assert isinstance(response, Response) + cls._headers = await response.request.all_headers() + cls._cookies = {i['name']: i['value'] for i in await context.cookies()} + await page.close() + await context.close() + return True, data['success'], data + await page.close() + await context.close() + return True, False, {'error': '绕过五秒盾失败'} + + @classmethod + async def init_cache(cls) -> None: + '''初始化缓存文件''' + if not os.path.exists(os.path.dirname(config.cache_path)): + os.makedirs(os.path.dirname(config.cache_path)) + if not os.path.exists(config.cache_path): + with open(file=config.cache_path, mode='w', encoding='UTF-8') as file: + file.write( + dumps( + { + 'headers': cls._headers, + 'cookies': cls._cookies + } + ) + ) + + @classmethod + async def read_cache(cls) -> None: + '''读取缓存文件''' + try: + with open(file=config.cache_path, mode='r', encoding='UTF-8') as file: + json = loads(file.read()) + cls._headers = json['headers'] + cls._cookies = json['cookies'] + except FileNotFoundError: + await cls.init_cache() + except PermissionError: + os.remove(config.cache_path) + await cls.init_cache() + except JSONDecodeError: + os.remove(config.cache_path) + await cls.init_cache() + + @classmethod + async def write_cache(cls) -> None: + '''写入缓存文件''' + try: + with open(file=config.cache_path, mode='r+', encoding='UTF-8') as file: + file.write( + dumps( + { + 'headers': cls._headers, + 'cookies': cls._cookies + } + ) + ) + except FileNotFoundError: + await cls.init_cache() + except PermissionError: + os.remove(config.cache_path) + await cls.init_cache() + except JSONDecodeError: + os.remove(config.cache_path) + await cls.init_cache() + + @classmethod + async def request(cls, url: str) -> tuple[bool, bool, dict[str, Any]]: + '''请求api''' + try: + async with aiohttp.ClientSession(cookies=cls._cookies) as session: + async with session.get(url, headers=cls._headers) as resp: + data = await resp.json() + return True, data['success'], data + except aiohttp.client_exceptions.ClientConnectorError as error: + logger.error(f'请求错误\n{error}') + return False, False, {} + except aiohttp.client_exceptions.ContentTypeError: + return await cls._anti_cloudflare(url) + + @classmethod + async def close_browser(cls) -> None: + '''关闭浏览器对象''' + if isinstance(cls._browser, Browser): + await cls._browser.close()