将数据库操作封装成类,并添加数据库路径的config选项

This commit is contained in:
2022-08-09 02:12:29 +08:00
parent 69e4cb97e3
commit b320a23c77
7 changed files with 110 additions and 103 deletions

View File

@@ -8,21 +8,16 @@ from nonebot import get_driver, on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.log import logger
from nonebot.matcher import Matcher
from playwright.async_api import Browser, Response, async_playwright
from ujson import JSONDecodeError, dumps, loads
from playwright.async_api import (
Browser,
Response,
async_playwright
)
from ..utils.config import Config
from ..utils.sql import query_bind_info, write_bind_info
from ..utils.database import DataBase
from ..utils.message_analyzer import (
handle_bind_message,
handle_stats_query_message
)
IOBind = on_regex(pattern=r'^io绑定|^iobind', flags=I, permission=GROUP)
IOStats = on_regex(pattern=r'^io查|^iostats', flags=I, permission=GROUP)
@@ -54,7 +49,7 @@ async def _(event: MessageEvent, matcher: Matcher):
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(
await write_bind_info(
await DataBase.write_bind_info(
qq_number=event.sender.user_id,
user=user_id,
game_type='IO'
@@ -70,7 +65,7 @@ async def _(event: MessageEvent, matcher: Matcher):
elif decoded_message[0] == 'AT':
if event.is_tome() is True:
await matcher.finish('不能查询bot的信息')
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='IO')
if bind_info is None:
message = '未查询到绑定信息'
else:
@@ -79,7 +74,7 @@ async def _(event: MessageEvent, matcher: Matcher):
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败, 请联系bot主人')
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='IO')
bind_info = await DataBase.query_bind_info(qq_number=event.sender.user_id, game_type='IO')
if bind_info is None:
message = '未查询到绑定信息'
else:
@@ -239,6 +234,7 @@ async def generate_message(user_name: str = None, user_id: str = None) -> str:
class Request:
'''网络请求相关类'''
_browser: Browser | None = None
_headers: dict | None = None
_cookies: dict | None = None

View File

@@ -1,16 +1,19 @@
from typing import Any
from re import I
from lxml import etree
from pandas import read_html
import aiohttp
from typing import Any
import aiohttp
from lxml import etree
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from nonebot.matcher import Matcher
from pandas import read_html
from ..utils.message_analyzer import handle_bind_message, handle_stats_query_message
from ..utils.sql import query_bind_info, write_bind_info
from ..utils.database import DataBase
from ..utils.message_analyzer import (
handle_bind_message,
handle_stats_query_message
)
TOPBind = on_regex(pattern=r'^top绑定|^topbind', flags=I, permission=GROUP)
TopStats = on_regex(pattern=r'^top查|^topstats', flags=I, permission=GROUP)
@@ -32,7 +35,7 @@ async def _(event: MessageEvent, matcher: Matcher):
if event.sender.user_id is None: # 理论上是不会有None出现的, ide快乐行属于是
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败')
await matcher.finish(await write_bind_info(qq_number=event.sender.user_id, user=user_name, game_type='TOP'))
await matcher.finish(await DataBase.write_bind_info(qq_number=event.sender.user_id, user=user_name, game_type='TOP'))
@TopStats.handle()
@@ -43,7 +46,7 @@ async def _(event: MessageEvent, matcher: Matcher):
elif decoded_message[0] == 'AT':
if event.is_tome() is True:
await matcher.finish(message='不能查询bot的信息')
bind_info = await query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
bind_info = await DataBase.query_bind_info(qq_number=decoded_message[1][1], game_type='TOP')
if bind_info is None:
message = '未查询到绑定信息'
else:
@@ -52,7 +55,7 @@ async def _(event: MessageEvent, matcher: Matcher):
if event.sender.user_id is None:
logger.error('获取QQ号失败')
await matcher.finish('获取QQ号失败, 请联系bot主人')
bind_info = await query_bind_info(qq_number=event.sender.user_id, game_type='TOP')
bind_info = await DataBase.query_bind_info(qq_number=event.sender.user_id, game_type='TOP')
if bind_info is None:
message = '未查询到绑定信息'
else:

View File

@@ -1,12 +1,12 @@
from typing import Any
from asyncio import gather
from re import I
import aiohttp
from typing import Any
import aiohttp
from nonebot import on_regex
from nonebot.adapters.onebot.v11 import GROUP, MessageEvent
from nonebot.matcher import Matcher
from nonebot.log import logger
from nonebot.matcher import Matcher
from ..utils.message_analyzer import handle_stats_query_message

View File

@@ -1 +1 @@
from . import message_analyzer, sql, config
from . import config, database, message_analyzer

View File

@@ -3,3 +3,4 @@ from pydantic import BaseModel
class Config(BaseModel):
cache_path: str = 'cache/nonebot_plugin_tetris_stats/cache'
db_path: str = 'data/nonebot_plugin_tetris_stats/data.db'

View File

@@ -0,0 +1,86 @@
import os
from asyncio import gather
from sqlite3 import Connection, connect
from nonebot import get_driver
from nonebot.log import logger
from .config import Config
driver = get_driver()
config = Config.parse_obj(get_driver().config)
@driver.on_startup
async def _():
await DataBase.init_db()
@driver.on_shutdown
async def _():
await DataBase.close_db()
class DataBase():
'''数据库交互类'''
_db: Connection | None = None
@classmethod
async def init_db(cls) -> Connection:
'''初始化数据库'''
if not os.path.exists(os.path.dirname(config.db_path)):
os.makedirs(os.path.dirname(config.db_path))
cls._db = connect(config.db_path)
cursor = cls._db.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cls._db.commit()
logger.info('数据库初始化完成')
return cls._db
@classmethod
async def _get_db(cls) -> Connection:
'''获取数据库对象'''
return cls._db or await cls.init_db()
@classmethod
async def query_bind_info(cls, qq_number: str | int, game_type: str) -> str | None:
'''查询绑定信息'''
db = await cls._get_db()
cursor = db.cursor()
cursor.execute(
f'SELECT USER FROM {game_type}BIND WHERE QQ = {qq_number}')
user = cursor.fetchone()
if user is None:
return None
return user[0]
@classmethod
async def write_bind_info(cls, qq_number: str | int, user: str, game_type: str) -> str:
'''写入绑定信息'''
bind_info, db = await gather(
cls.query_bind_info(qq_number=qq_number, game_type=game_type),
cls._get_db()
)
cursor = db.cursor()
if bind_info is not None:
cursor.execute(
f'UPDATE {game_type}BIND SET USER = ? WHERE QQ = ?', (user, qq_number))
message = '更新成功'
elif bind_info is None:
cursor.execute(
f'INSERT INTO {game_type}BIND (QQ, USER) VALUES (?, ?)', (qq_number, user))
message = '绑定成功'
db.commit()
return message
@classmethod
async def close_db(cls) -> None:
'''关闭数据库对象'''
if isinstance(cls._db, Connection):
cls._db.close()

View File

@@ -1,79 +0,0 @@
from sqlite3 import connect, Connection
import os
from nonebot import get_driver
from nonebot.log import logger
_DB_FILE = 'data/nonebot_plugin_tetris_stats/data.db'
_DB: Connection | None = None
driver = get_driver()
@driver.on_startup
async def _():
'''初始化数据库'''
await init_db()
@driver.on_shutdown
async def _():
if isinstance(_DB, Connection):
_DB.close()
async def init_db() -> Connection:
'''初始化数据库'''
if not os.path.exists(os.path.dirname(_DB_FILE)):
if os.path.exists('data/nonebot-plugin-tetris-stats'): # 重命名旧的数据库路径
os.rename('data/nonebot-plugin-tetris-stats',
os.path.dirname(_DB_FILE))
else:
os.makedirs(os.path.dirname(_DB_FILE))
global _DB
_DB = connect(_DB_FILE)
cursor = _DB.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS IOBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
cursor.execute('''CREATE TABLE IF NOT EXISTS TOPBIND
(QQ INTEGER NOT NULL,
USER TEXT NOT NULL)''')
_DB.commit()
logger.info('数据库初始化完成')
return _DB
async def get_db() -> Connection:
'''获取数据库对象'''
return _DB or await init_db()
async def query_bind_info(qq_number: str | int, game_type: str) -> str | None:
'''查询绑定信息'''
db = await get_db()
cursor = db.cursor()
cursor.execute(f'SELECT USER FROM {game_type}BIND WHERE QQ = {qq_number}')
user = cursor.fetchone()
db.commit()
if user is None:
return None
return user[0]
async def write_bind_info(qq_number: str | int, user: str, game_type: str) -> str:
'''写入绑定信息'''
bind_info = await query_bind_info(qq_number, game_type)
db = await get_db()
cursor = db.cursor()
if bind_info is not None:
cursor.execute(
f'UPDATE {game_type}BIND SET USER = ? WHERE QQ = ?', (user, qq_number))
message = '更新成功'
elif bind_info is None:
cursor.execute(
f'INSERT INTO {game_type}BIND (QQ, USER) VALUES (?, ?)', (qq_number, user))
message = '绑定成功'
db.commit()
return message