💥 🗃️ 将 pydantic 模型序列化后再存数据库

This commit is contained in:
2023-11-21 20:47:56 +08:00
parent 60605d0dca
commit 1e02858913
4 changed files with 135 additions and 10 deletions

View File

@@ -0,0 +1,112 @@
"""Recreate HistoricalData
迁移 ID: 9f6582279ce2
父迁移: 9cd1647db502
创建时间: 2023-11-21 08:35:50.393246
"""
from __future__ import annotations
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import sqlite
from nonebot_plugin_tetris_stats.db.models import PydanticType
revision: str = '9f6582279ce2'
down_revision: str | Sequence[str] | None = '9cd1647db502'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade(name: str = '') -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_historicaldata_command_type')
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_historicaldata_game_platform')
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_historicaldata_source_account')
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_historicaldata_source_type')
op.drop_table('nonebot_plugin_tetris_stats_historicaldata')
op.create_table(
'nonebot_plugin_tetris_stats_historicaldata',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('trigger_time', sa.DateTime(), nullable=False),
sa.Column('bot_platform', sa.String(length=32), nullable=True),
sa.Column('bot_account', sa.String(), nullable=True),
sa.Column('source_type', sa.String(length=32), nullable=True),
sa.Column('source_account', sa.String(), nullable=True),
sa.Column('message', sa.PickleType(), nullable=True),
sa.Column('game_platform', sa.String(length=32), nullable=False),
sa.Column('command_type', sa.String(length=16), nullable=False),
sa.Column('command_args', sa.JSON(), nullable=False),
sa.Column('game_user', PydanticType(), nullable=False),
sa.Column('processed_data', PydanticType(), nullable=False),
sa.Column('finish_time', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_historicaldata')),
)
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_command_type'), ['command_type'], unique=False
)
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_game_platform'), ['game_platform'], unique=False
)
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_source_account'), ['source_account'], unique=False
)
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_source_type'), ['source_type'], unique=False
)
# ### end Alembic commands ###
def downgrade(name: str = '') -> None:
if name:
return
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_source_type'))
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_source_account'))
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_game_platform'))
batch_op.drop_index(batch_op.f('ix_nonebot_plugin_tetris_stats_historicaldata_command_type'))
op.drop_table('nonebot_plugin_tetris_stats_historicaldata')
op.create_table(
'nonebot_plugin_tetris_stats_historicaldata',
sa.Column('id', sa.INTEGER(), nullable=False),
sa.Column('trigger_time', sa.DATETIME(), nullable=False),
sa.Column('bot_platform', sa.VARCHAR(length=32), nullable=True),
sa.Column('bot_account', sa.VARCHAR(), nullable=True),
sa.Column('source_type', sa.VARCHAR(length=32), nullable=True),
sa.Column('source_account', sa.VARCHAR(), nullable=True),
sa.Column('message', sa.BLOB(), nullable=True),
sa.Column('game_platform', sa.VARCHAR(length=32), nullable=False),
sa.Column('command_type', sa.VARCHAR(length=16), nullable=False),
sa.Column('command_args', sqlite.JSON(), nullable=False),
sa.Column('game_user', sa.BLOB(), nullable=False),
sa.Column('processed_data', sa.BLOB(), nullable=False),
sa.Column('finish_time', sa.DATETIME(), nullable=False),
sa.PrimaryKeyConstraint('id', name='pk_nonebot_plugin_tetris_stats_historicaldata'),
)
with op.batch_alter_table('nonebot_plugin_tetris_stats_historicaldata', schema=None) as batch_op:
batch_op.create_index(
'ix_nonebot_plugin_tetris_stats_historicaldata_source_type', ['source_type'], unique=False
)
batch_op.create_index(
'ix_nonebot_plugin_tetris_stats_historicaldata_source_account', ['source_account'], unique=False
)
batch_op.create_index(
'ix_nonebot_plugin_tetris_stats_historicaldata_game_platform', ['game_platform'], unique=False
)
batch_op.create_index(
'ix_nonebot_plugin_tetris_stats_historicaldata_command_type', ['command_type'], unique=False
)
# ### end Alembic commands ###

View File

@@ -2,13 +2,26 @@ from datetime import datetime
from nonebot.adapters import Message from nonebot.adapters import Message
from nonebot_plugin_orm import Model from nonebot_plugin_orm import Model
from sqlalchemy import JSON, DateTime, PickleType, String from pydantic import BaseModel
from sqlalchemy import JSON, DateTime, Dialect, PickleType, String, TypeDecorator
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from ..game_data_processor.schemas import BaseProcessedData, BaseUser from ..game_data_processor.schemas import BaseProcessedData, BaseUser
from ..utils.typing import CommandType, GameType from ..utils.typing import CommandType, GameType
class PydanticType(TypeDecorator):
impl = JSON
def process_bind_param(self, value: BaseModel, dialect: Dialect) -> str:
# 将 Pydantic 模型实例转换为 JSON
return value.json()
def process_result_value(self, value: str, dialect: Dialect) -> BaseModel:
# 将 JSON 转换回 Pydantic 模型实例
return BaseModel.parse_raw(value)
class Bind(MappedAsDataclass, Model): class Bind(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True) id: Mapped[int] = mapped_column(init=False, primary_key=True)
chat_platform: Mapped[str] = mapped_column(String(32), index=True) chat_platform: Mapped[str] = mapped_column(String(32), index=True)
@@ -28,6 +41,6 @@ class HistoricalData(MappedAsDataclass, Model):
game_platform: Mapped[GameType] = mapped_column(String(32), index=True, init=False) game_platform: Mapped[GameType] = mapped_column(String(32), index=True, init=False)
command_type: Mapped[CommandType] = mapped_column(String(16), index=True, init=False) command_type: Mapped[CommandType] = mapped_column(String(16), index=True, init=False)
command_args: Mapped[list[str]] = mapped_column(JSON, init=False) command_args: Mapped[list[str]] = mapped_column(JSON, init=False)
game_user: Mapped[BaseUser] = mapped_column(PickleType, init=False) game_user: Mapped[BaseUser] = mapped_column(PydanticType, init=False)
processed_data: Mapped[BaseProcessedData] = mapped_column(PickleType, init=False) processed_data: Mapped[BaseProcessedData] = mapped_column(PydanticType, init=False)
finish_time: Mapped[datetime] = mapped_column(DateTime, init=False) finish_time: Mapped[datetime] = mapped_column(DateTime, init=False)

View File

@@ -130,9 +130,9 @@ class Processor(ProcessorMeta):
"""获取用户数据""" """获取用户数据"""
if other_parameter is None: if other_parameter is None:
other_parameter = {} other_parameter = {}
fset: frozenset[tuple[str, str | bytes]] = frozenset(other_parameter.items()) params = urlencode(dict(sorted(other_parameter.items())))
if self.processed_data.user_profile.get(fset) is None: if self.processed_data.user_profile.get(params) is None:
self.raw_response.user_profile[fset] = await Request.request( self.raw_response.user_profile[params] = await Request.request(
splice_url( splice_url(
[ [
BASE_URL, BASE_URL,
@@ -141,8 +141,8 @@ class Processor(ProcessorMeta):
] ]
) )
) )
self.processed_data.user_profile[fset] = UserProfile.parse_raw(self.raw_response.user_profile[fset]) self.processed_data.user_profile[params] = UserProfile.parse_raw(self.raw_response.user_profile[params])
return self.processed_data.user_profile[fset] return self.processed_data.user_profile[params]
async def get_game_data(self) -> GameData | None: async def get_game_data(self) -> GameData | None:
"""获取游戏数据""" """获取游戏数据"""

View File

@@ -4,10 +4,10 @@ from .user_profile import UserProfile
class RawResponse(BaseRawResponse): class RawResponse(BaseRawResponse):
user_profile: dict[frozenset[tuple[str, str | bytes]], bytes] user_profile: dict[str, bytes]
user_info: bytes | None = None user_info: bytes | None = None
class ProcessedData(BaseProcessedData): class ProcessedData(BaseProcessedData):
user_profile: dict[frozenset[tuple[str, str | bytes]], UserProfile] user_profile: dict[str, UserProfile]
user_info: InfoSuccess | None = None user_info: InfoSuccess | None = None