mirror of
https://github.com/A-Minos/nonebot-plugin-tetris-stats.git
synced 2026-03-05 05:36:54 +08:00
95 lines
3.0 KiB
Python
95 lines
3.0 KiB
Python
from datetime import datetime, timezone
|
|
from typing import ClassVar
|
|
|
|
from nonebot import get_driver, get_plugin
|
|
from nonebot.adapters import Bot, Event
|
|
from nonebot.matcher import Matcher
|
|
from nonebot.message import run_postprocessor, run_preprocessor
|
|
from nonebot_plugin_orm import get_session
|
|
|
|
from ..db.models import HistoricalData
|
|
|
|
UTC = timezone.utc
|
|
|
|
driver = get_driver()
|
|
|
|
|
|
class Recorder:
|
|
matchers: ClassVar[set[type[Matcher]]] = set()
|
|
historical_data: ClassVar[dict[int, tuple[HistoricalData, bool]]] = {}
|
|
error_event: ClassVar[set[int]] = set()
|
|
|
|
@classmethod
|
|
def create_historical_data(cls, event_id: int, historical_data: HistoricalData) -> None:
|
|
cls.historical_data[event_id] = (historical_data, False)
|
|
|
|
@classmethod
|
|
def update_historical_data(cls, event_id: int, historical_data: HistoricalData) -> None:
|
|
if event_id not in cls.historical_data:
|
|
raise KeyError
|
|
cls.historical_data[event_id] = (historical_data, True)
|
|
|
|
@classmethod
|
|
def get_historical_data(cls, event_id: int) -> HistoricalData:
|
|
return cls.historical_data[event_id][0]
|
|
|
|
@classmethod
|
|
async def save_historical_data(cls, event_id: int) -> None:
|
|
historical_data, completed = cls.del_historical_data(event_id)
|
|
if completed:
|
|
async with get_session() as session:
|
|
session.add(historical_data)
|
|
await session.commit()
|
|
|
|
@classmethod
|
|
def del_historical_data(cls, event_id: int) -> tuple[HistoricalData, bool]:
|
|
return cls.historical_data.pop(event_id)
|
|
|
|
@classmethod
|
|
def add_error_event(cls, event_id: int) -> None:
|
|
cls.error_event.add(event_id)
|
|
|
|
@classmethod
|
|
def del_error_event(cls, event_id: int) -> None:
|
|
cls.error_event.remove(event_id)
|
|
|
|
@classmethod
|
|
def is_error_event(cls, event_id: int) -> bool:
|
|
return event_id in cls.error_event
|
|
|
|
|
|
@driver.on_startup
|
|
def _():
|
|
plugin = get_plugin('nonebot_plugin_tetris_stats')
|
|
if plugin is not None:
|
|
Recorder.matchers = plugin.matcher
|
|
else:
|
|
raise RuntimeError('获取不到自身插件对象')
|
|
|
|
|
|
@run_preprocessor
|
|
def _(bot: Bot, event: Event, matcher: Matcher):
|
|
if isinstance(matcher, tuple(Recorder.matchers)):
|
|
Recorder.create_historical_data(
|
|
event_id=id(event),
|
|
historical_data=HistoricalData(
|
|
trigger_time=datetime.now(tz=UTC),
|
|
bot_platform=bot.type,
|
|
bot_account=bot.self_id,
|
|
source_type=event.get_type(),
|
|
source_account=event.get_session_id(),
|
|
message=event.get_message(),
|
|
),
|
|
)
|
|
|
|
|
|
@run_postprocessor
|
|
async def _(event: Event, matcher: Matcher, exception: Exception | None):
|
|
if isinstance(matcher, tuple(Recorder.matchers)):
|
|
event_id = id(event)
|
|
if exception is not None:
|
|
Recorder.add_error_event(event_id)
|
|
Recorder.del_historical_data(event_id)
|
|
else:
|
|
await Recorder.save_historical_data(event_id)
|