Compare commits

..

11 Commits
1.4.1 ... 1.4.2

Author SHA1 Message Date
435850819c 🔖 1.4.2 2024-08-04 19:57:47 +08:00
6f439ad357 适配新模板 2024-08-04 19:22:36 +08:00
b74cc1f4a0 🐛 修复 TETR.IO 获取 user 时出现 UnboundLocalError 2024-08-04 19:21:52 +08:00
1a1c2675d1 再次更新模板仓库处理逻辑 2024-08-03 23:52:45 +08:00
1f02c107f5 AR排行榜 API 模型 2024-08-03 16:47:57 +08:00
89c319a500 完善 PluginMetadata 2024-08-02 22:46:00 +08:00
56f9a69c4d 🙈 更新 .gitignore 2024-08-02 22:19:59 +08:00
50431fe7cb 新赛季排行榜 API 模型 2024-08-02 22:15:46 +08:00
71ad53a1f9 适配 TETR.IO rank v1 模板 2024-08-02 22:15:46 +08:00
820393f216 🎨 减少两个 overload 2024-08-02 22:15:45 +08:00
27994cea6b 🗃️ 清空 TETR.IO 旧赛季数据 2024-08-02 22:15:45 +08:00
25 changed files with 386 additions and 257 deletions

2
.gitignore vendored
View File

@@ -20,3 +20,5 @@ bot.py
TODO
*.fish
extracted_skin_mino_*
sample_*
TODO*

View File

@@ -1,14 +1,19 @@
from nonebot import require
from nonebot.plugin import PluginMetadata
from nonebot.plugin import PluginMetadata, inherit_supported_adapters
require('nonebot_plugin_alconna')
require('nonebot_plugin_apscheduler')
require('nonebot_plugin_localstore')
require('nonebot_plugin_orm')
require('nonebot_plugin_session_orm')
require('nonebot_plugin_session')
require('nonebot_plugin_user')
require('nonebot_plugin_userinfo')
require_plugins = {
'nonebot_plugin_alconna',
'nonebot_plugin_apscheduler',
'nonebot_plugin_localstore',
'nonebot_plugin_orm',
'nonebot_plugin_session_orm',
'nonebot_plugin_session',
'nonebot_plugin_user',
'nonebot_plugin_userinfo',
}
for i in require_plugins:
require(i)
from nonebot_plugin_alconna import namespace # noqa: E402
@@ -16,6 +21,7 @@ with namespace('tetris_stats') as ns:
ns.enable_message_cache = False
from .config import migrations # noqa: E402
from .config.config import Config # noqa: E402
__plugin_meta__ = PluginMetadata(
name='Tetris Stats',
@@ -23,6 +29,8 @@ __plugin_meta__ = PluginMetadata(
usage='发送 tstats --help 查询使用方法',
type='application',
homepage='https://github.com/A-minos/nonebot-plugin-tetris-stats',
config=Config,
supported_adapters=inherit_supported_adapters(*require_plugins),
extra={
'orm_version_location': migrations,
},

View File

@@ -0,0 +1,91 @@
"""TETR.IO new season
迁移 ID: f5b4a6d1325b
父迁移: a1195e989cc6
创建时间: 2024-08-01 20:44:48.644912
"""
from __future__ import annotations
from typing import TYPE_CHECKING
import sqlalchemy as sa
from alembic import op
if TYPE_CHECKING:
from collections.abc import Sequence
revision: str = 'f5b4a6d1325b'
down_revision: str | Sequence[str] | None = 'a1195e989cc6'
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade(name: str = '') -> None:
if name:
return
with op.batch_alter_table('nonebot_plugin_tetris_stats_iorank', schema=None) as batch_op:
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_iorank_file_hash')
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_iorank_rank')
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_iorank_update_time')
op.drop_table('nonebot_plugin_tetris_stats_iorank')
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetriohistoricaldata', schema=None) as batch_op:
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_tetriohistoricaldata_api_type')
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_tetriohistoricaldata_update_time')
batch_op.drop_index('ix_nonebot_plugin_tetris_stats_tetriohistoricaldata_user_unique_identifier')
op.drop_table('nonebot_plugin_tetris_stats_tetriohistoricaldata')
op.create_table(
'nonebot_plugin_tetris_stats_tetriohistoricaldata',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_unique_identifier', sa.String(length=24), nullable=False),
sa.Column('api_type', sa.String(length=16), nullable=False),
sa.Column('data', sa.JSON(), nullable=False),
sa.Column('update_time', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id', name=op.f('pk_nonebot_plugin_tetris_stats_tetriohistoricaldata')),
info={'bind_key': 'nonebot_plugin_tetris_stats'},
)
with op.batch_alter_table('nonebot_plugin_tetris_stats_tetriohistoricaldata', schema=None) as batch_op:
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_tetriohistoricaldata_api_type'), ['api_type'], unique=False
)
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_tetriohistoricaldata_update_time'), ['update_time'], unique=False
)
batch_op.create_index(
batch_op.f('ix_nonebot_plugin_tetris_stats_tetriohistoricaldata_user_unique_identifier'),
['user_unique_identifier'],
unique=False,
)
def downgrade(name: str = '') -> None:
if name:
return
op.create_table(
'nonebot_plugin_tetris_stats_iorank',
sa.Column('id', sa.INTEGER(), nullable=False),
sa.Column('rank', sa.VARCHAR(length=2), nullable=False),
sa.Column('tr_line', sa.FLOAT(), nullable=False),
sa.Column('player_count', sa.INTEGER(), nullable=False),
sa.Column('low_pps', sa.JSON(), nullable=False),
sa.Column('low_apm', sa.JSON(), nullable=False),
sa.Column('low_vs', sa.JSON(), nullable=False),
sa.Column('avg_pps', sa.FLOAT(), nullable=False),
sa.Column('avg_apm', sa.FLOAT(), nullable=False),
sa.Column('avg_vs', sa.FLOAT(), nullable=False),
sa.Column('high_pps', sa.JSON(), nullable=False),
sa.Column('high_apm', sa.JSON(), nullable=False),
sa.Column('high_vs', sa.JSON(), nullable=False),
sa.Column('update_time', sa.DATETIME(), nullable=False),
sa.Column('file_hash', sa.VARCHAR(length=128), nullable=True),
sa.PrimaryKeyConstraint('id', name='pk_nonebot_plugin_tetris_stats_iorank'),
)
with op.batch_alter_table('nonebot_plugin_tetris_stats_iorank', schema=None) as batch_op:
batch_op.create_index('ix_nonebot_plugin_tetris_stats_iorank_update_time', ['update_time'], unique=False)
batch_op.create_index('ix_nonebot_plugin_tetris_stats_iorank_rank', ['rank'], unique=False)
batch_op.create_index('ix_nonebot_plugin_tetris_stats_iorank_file_hash', ['file_hash'], unique=False)

View File

@@ -83,8 +83,8 @@ class Player:
ID=user_info.data.id,
name=user_info.data.username,
)
self.user_id = user_info.data.id
self.user_name = user_info.data.username
self.user_id = self.__user.ID
self.user_name = self.__user.name
return self.__user
async def get_info(self) -> UserInfoSuccess:
@@ -108,13 +108,9 @@ class Player:
return self._user_info
@overload
async def get_summaries(self, summaries_type: Literal['40l']) -> SoloSuccessModel: ...
async def get_summaries(self, summaries_type: Literal['40l', 'blitz']) -> SoloSuccessModel: ...
@overload
async def get_summaries(self, summaries_type: Literal['blitz']) -> SoloSuccessModel: ...
@overload
async def get_summaries(self, summaries_type: Literal['zenith']) -> ZenithSuccessModel: ...
@overload
async def get_summaries(self, summaries_type: Literal['zenithex']) -> ZenithSuccessModel: ...
async def get_summaries(self, summaries_type: Literal['zenith', 'zenithex']) -> ZenithSuccessModel: ...
@overload
async def get_summaries(self, summaries_type: Literal['zen']) -> ZenSuccessModel: ...
@overload

View File

@@ -4,6 +4,12 @@ from typing import Literal
from pydantic import BaseModel
class P(BaseModel): # what is P
pri: float
sec: float
ter: float
class Cache(BaseModel):
status: str
cached_at: datetime

View File

@@ -0,0 +1,27 @@
from pydantic import BaseModel, Field
from ..base import SuccessModel
from .base import Entry as BaseEntry
class ArCounts(BaseModel):
bronze: int | None = Field(None, alias='1')
silver: int | None = Field(None, alias='2')
gold: int | None = Field(None, alias='3')
platinum: int | None = Field(None, alias='4')
diamond: int | None = Field(None, alias='5')
issued: int | None = Field(None, alias='100')
top10: int | None = Field(None, alias='t10')
class Entry(BaseEntry):
ar: int
ar_counts: ArCounts
class Data(BaseModel):
entries: list[Entry]
class ArSuccessModel(SuccessModel):
data: Data

View File

@@ -0,0 +1,30 @@
from datetime import datetime
from pydantic import BaseModel, Field
from ...typing import Rank
from ..base import P
class League(BaseModel):
gamesplayed: int
gameswon: int
rating: int
rank: Rank
decaying: bool
class Entry(BaseModel):
id: str = Field(..., alias='_id')
username: str
role: str
xp: float
league: League
supporter: bool | None = None
verified: bool
country: str | None = None
ts: datetime
gamesplayed: int
gameswon: int
gametime: float
p: P

View File

@@ -0,0 +1,12 @@
from pydantic import BaseModel
from ..base import SuccessModel
from ..summaries.solo import Record
class Data(BaseModel):
entries: list[Record]
class SoloSuccessModel(SuccessModel):
data: Data

View File

@@ -0,0 +1,12 @@
from pydantic import BaseModel
from ..base import SuccessModel
from .base import Entry
class Data(BaseModel):
entries: list[Entry]
class XpSuccessModel(SuccessModel):
data: Data

View File

@@ -0,0 +1,12 @@
from pydantic import BaseModel
from ..base import SuccessModel
from ..summaries.zenith import Record
class Data(BaseModel):
entries: list[Record]
class ZenithSuccessModel(SuccessModel):
data: Data

View File

@@ -4,9 +4,9 @@ from pydantic import BaseModel
class User(BaseModel):
id: str
username: str
avatar_revision: int
banner_revision: int
country: str
avatar_revision: int | None
banner_revision: int | None
country: str | None
verified: int
supporter: int
@@ -21,9 +21,3 @@ class Finesse(BaseModel):
combo: int
faults: int
perfectpieces: int
class P(BaseModel): # what is P
pri: float
sec: float
ter: float

View File

@@ -3,8 +3,8 @@ from typing import Literal, TypeAlias
from pydantic import BaseModel, Field
from ..base import FailedModel, SuccessModel
from .base import AggregateStats, Finesse, P, User
from ..base import FailedModel, P, SuccessModel
from .base import AggregateStats, Finesse, User
class Time(BaseModel):
@@ -34,7 +34,7 @@ class Clears(BaseModel):
class Garbage(BaseModel):
sent: int
received: int
attack: int
attack: int | None
cleared: int

View File

@@ -3,8 +3,8 @@ from typing import Literal, TypeAlias
from pydantic import BaseModel, Field
from ..base import FailedModel, SuccessModel
from .base import AggregateStats, Finesse, P, User
from ..base import FailedModel, P, SuccessModel
from .base import AggregateStats, Finesse, User
class Clears(BaseModel):
@@ -76,7 +76,7 @@ class Stats(BaseModel):
kills: int
finesse: Finesse
zenith: _Zenith
finaltime: int
finaltime: float
class Results(BaseModel):

View File

@@ -1,34 +1,10 @@
from datetime import datetime
from nonebot_plugin_orm import Model
from sqlalchemy import JSON, DateTime, String
from sqlalchemy import String
from sqlalchemy.orm import Mapped, MappedAsDataclass, mapped_column
from .api.typing import ValidRank
from .typing import Template
class IORank(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(init=False, primary_key=True)
rank: Mapped[ValidRank] = mapped_column(String(2), index=True)
tr_line: Mapped[float]
player_count: Mapped[int]
low_pps: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
low_apm: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
low_vs: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
avg_pps: Mapped[float]
avg_apm: Mapped[float]
avg_vs: Mapped[float]
high_pps: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
high_apm: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
high_vs: Mapped[tuple[dict[str, str], float]] = mapped_column(JSON)
update_time: Mapped[datetime] = mapped_column(
DateTime,
index=True,
)
file_hash: Mapped[str | None] = mapped_column(String(128), index=True)
class TETRIOUserConfig(MappedAsDataclass, Model):
id: Mapped[int] = mapped_column(primary_key=True)
query_template: Mapped[Template] = mapped_column(String(2))

View File

@@ -81,6 +81,7 @@ async def make_blitz_image(player: Player) -> bytes:
page=await render(
'v2/tetrio/record/blitz',
Record(
type='personal_best',
user=User(
id=user.ID,
name=user.name.upper(),
@@ -93,6 +94,7 @@ async def make_blitz_image(player: Player) -> bytes:
),
replay_id=blitz.data.record.replayid,
rank=blitz.data.rank,
personal_rank=1,
statistic=Statistic(
keys=stats.inputs,
kpp=round(stats.inputs / stats.piecesplaced, 2),

View File

@@ -18,8 +18,8 @@ from ....utils.host import HostPage, get_self_netloc
from ....utils.metrics import get_metrics
from ....utils.render import render
from ....utils.render.schemas.base import Avatar
from ....utils.render.schemas.tetrio.tetrio_record_base import Finesse, Max, Mini, Tspins, User
from ....utils.render.schemas.tetrio.tetrio_record_sprint import Record, Statistic
from ....utils.render.schemas.tetrio.tetrio_record_base import Finesse, Max, Mini, Statistic, Tspins, User
from ....utils.render.schemas.tetrio.tetrio_record_sprint import Record
from ....utils.screenshot import screenshot
from ....utils.typing import Me
from ...constant import CANT_VERIFY_MESSAGE
@@ -82,6 +82,7 @@ async def make_sprint_image(player: Player) -> bytes:
page=await render(
'v2/tetrio/record/40l',
Record(
type='personal_best',
user=User(
id=user.ID,
name=user.name.upper(),
@@ -95,6 +96,7 @@ async def make_sprint_image(player: Player) -> bytes:
time=sprint_value,
replay_id=sprint.data.record.replayid,
rank=sprint.data.rank,
personal_rank=1,
statistic=Statistic(
keys=stats.inputs,
kpp=round(stats.inputs / stats.piecesplaced, 2),

View File

@@ -11,7 +11,7 @@ from nonebot.log import logger
from ..config.config import CACHE_PATH
from .image import img_to_png
from .request import Request
from .templates import templates_dir
from .templates import TEMPLATES_DIR
if TYPE_CHECKING:
from pydantic import IPvAnyAddress
@@ -48,7 +48,7 @@ class HostPage:
def _():
app.mount(
'/host/assets',
StaticFiles(directory=templates_dir / 'assets'),
StaticFiles(directory=TEMPLATES_DIR / 'assets'),
name='assets',
)
logger.success('assets mounted')

View File

@@ -3,11 +3,12 @@ from typing import Literal, overload
from jinja2 import Environment, FileSystemLoader
from nonebot.compat import PYDANTIC_V2
from ..templates import templates_dir
from ..templates import TEMPLATES_DIR
from .schemas.bind import Bind
from .schemas.tetrio.tetrio_info import Info as TETRIOInfo
from .schemas.tetrio.tetrio_rank import Data as TETRIORankData
from .schemas.tetrio.tetrio_rank_detail import Data as TETRIORankDetailData
from .schemas.tetrio.tetrio_rank_v1 import Data as TETRIORankDataV1
from .schemas.tetrio.tetrio_rank_v2 import Data as TETRIORankDataV2
from .schemas.tetrio.tetrio_record_blitz import Record as TETRIORecordBlitz
from .schemas.tetrio.tetrio_record_sprint import Record as TETRIORecordSprint
from .schemas.tetrio.tetrio_user_info_v2 import Info as TETRIOUserInfoV2
@@ -16,7 +17,7 @@ from .schemas.top_info import Info as TOPInfo
from .schemas.tos_info import Info as TOSInfo
env = Environment(
loader=FileSystemLoader(templates_dir), autoescape=True, trim_blocks=True, lstrip_blocks=True, enable_async=True
loader=FileSystemLoader(TEMPLATES_DIR), autoescape=True, trim_blocks=True, lstrip_blocks=True, enable_async=True
)
@@ -25,6 +26,8 @@ async def render(render_type: Literal['v1/binding'], data: Bind) -> str: ...
@overload
async def render(render_type: Literal['v1/tetrio/info'], data: TETRIOInfo) -> str: ...
@overload
async def render(render_type: Literal['v1/tetrio/rank'], data: TETRIORankDataV1) -> str: ...
@overload
async def render(render_type: Literal['v1/top/info'], data: TOPInfo) -> str: ...
@overload
async def render(render_type: Literal['v1/tos/info'], data: TOSInfo) -> str: ...
@@ -37,7 +40,7 @@ async def render(render_type: Literal['v2/tetrio/record/40l'], data: TETRIORecor
@overload
async def render(render_type: Literal['v2/tetrio/record/blitz'], data: TETRIORecordBlitz) -> str: ...
@overload
async def render(render_type: Literal['v2/tetrio/rank'], data: TETRIORankData) -> str: ...
async def render(render_type: Literal['v2/tetrio/rank'], data: TETRIORankDataV2) -> str: ...
@overload
async def render(render_type: Literal['v2/tetrio/rank/detail'], data: TETRIORankDetailData) -> str: ...
@@ -46,6 +49,7 @@ async def render(
render_type: Literal[
'v1/binding',
'v1/tetrio/info',
'v1/tetrio/rank',
'v1/top/info',
'v1/tos/info',
'v2/tetrio/user/info',
@@ -57,13 +61,14 @@ async def render(
],
data: Bind
| TETRIOInfo
| TETRIORankDataV1
| TOPInfo
| TOSInfo
| TETRIOUserInfoV2
| TETRIOUserListV2
| TETRIORecordSprint
| TETRIORecordBlitz
| TETRIORankData
| TETRIORankDataV2
| TETRIORankDetailData,
) -> str:
if PYDANTIC_V2:

View File

@@ -0,0 +1,16 @@
from datetime import datetime
from pydantic import BaseModel
from .....games.tetrio.api.typing import ValidRank
class ItemData(BaseModel):
trending: float
require_tr: float
players: int
class Data(BaseModel):
items: dict[ValidRank, ItemData]
updated_at: datetime

View File

@@ -1,3 +1,6 @@
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from ..base import People
@@ -32,7 +35,7 @@ class Finesse(BaseModel):
accuracy: float
class RecordStatistic(BaseModel):
class Statistic(BaseModel):
keys: int
kpp: float
kps: float
@@ -56,3 +59,17 @@ class RecordStatistic(BaseModel):
all_clear: int
finesse: Finesse
class Record(BaseModel):
type: Literal['best', 'personal_best', 'recent', 'disputed']
user: User
replay_id: str
rank: int | None
personal_rank: int | None
statistic: Statistic
play_at: datetime

View File

@@ -1,22 +1,12 @@
from datetime import datetime
from pydantic import BaseModel
from .tetrio_record_base import RecordStatistic, User
from .tetrio_record_base import Record as BaseRecord
from .tetrio_record_base import Statistic as BaseStatistic
class Statistic(RecordStatistic):
class Statistic(BaseStatistic):
spp: float
level: int
class Record(BaseModel):
user: User
replay_id: str
rank: int | None
class Record(BaseRecord):
statistic: Statistic
play_at: datetime

View File

@@ -1,18 +1,5 @@
from datetime import datetime
from pydantic import BaseModel
from .tetrio_record_base import RecordStatistic as Statistic
from .tetrio_record_base import User
from .tetrio_record_base import Record as BaseRecord
class Record(BaseModel):
user: User
class Record(BaseRecord):
time: str
replay_id: str
rank: int | None
statistic: Statistic
play_at: datetime

View File

@@ -1,186 +1,130 @@
from asyncio.subprocess import PIPE, Process, create_subprocess_exec
from enum import Enum, auto
from hashlib import sha256
from http import HTTPStatus
from pathlib import Path
from shutil import rmtree
from typing import NamedTuple
from time import time_ns
from zipfile import ZipFile
from aiofiles import open
from httpx import AsyncClient
from nonebot import get_driver
from nonebot.log import logger
from nonebot.permission import SUPERUSER
from nonebot_plugin_alconna import Alconna, Args, Option, on_alconna
from nonebot_plugin_alconna.uniseg import UniMessage
from nonebot_plugin_localstore import get_data_dir # type: ignore[import-untyped]
from nonebot_plugin_localstore import get_cache_file, get_data_dir
from rich.progress import Progress
driver = get_driver()
templates_dir = get_data_dir('nonebot_plugin_tetris_stats') / 'templates'
TEMPLATES_DIR = get_data_dir('nonebot_plugin_tetris_stats') / 'templates'
alc = on_alconna(Alconna('更新模板', Option('--revision', Args['revision', str], alias={'-R'})), permission=SUPERUSER)
logger.level('GIT', no=10, color='<blue>')
async def download_templates(tag: str) -> Path:
logger.info(f'开始下载模板 {tag}')
async with AsyncClient() as client:
if tag == 'latest':
logger.info('目标为 latest, 正在获取最新版本号')
tag = (
(
await client.get(
'https://github.com/A-Minos/tetris-stats-templates/releases/latest', follow_redirects=True
)
)
.url.path.strip('/')
.rsplit('/', 1)[-1]
)
logger.success(f'获取到的最新版本号: {tag}')
path = get_cache_file('nonebot_plugin_tetris_stats', f'dist_{time_ns()}.zip')
with Progress() as progress:
task_id = progress.add_task('[red]Downloading...', total=None)
async with (
client.stream(
'GET',
f'https://github.com/A-Minos/tetris-stats-templates/releases/download/{tag}/dist.zip',
follow_redirects=True,
) as response,
open(path, 'wb') as file,
):
response.raise_for_status()
progress.update(task_id, total=int(response.headers.get('content-length', 0)) or None)
async for chunk in response.aiter_bytes():
await file.write(chunk)
progress.update(task_id, advance=len(chunk))
logger.success('模板下载完成')
return path
class Status(Enum):
OK = auto()
NOT_EXIST = auto()
NOT_INITIALIZATION = auto()
async def unzip_templates(zip_path: Path) -> Path:
logger.info('开始解压模板')
temp_path = TEMPLATES_DIR.parent / f'temp_{time_ns()}'
with ZipFile(zip_path) as zip_file:
zip_file.extractall(temp_path)
zip_path.unlink()
logger.success('模板解压完成')
return temp_path
class Output(NamedTuple):
stdout: list[str]
stderr: list[str]
async def parse_log(proc: Process) -> Output:
stdout, stderr = await proc.communicate()
for i in (out := stdout.decode().splitlines()):
logger.log('GIT', f'stdout: {i}')
# stderr 可能是 None
for i in (err := (stderr or b'').decode().splitlines()):
logger.log('GIT', f'stderr: {i}')
return Output(out, err)
async def check_git() -> None:
try:
await parse_log(await create_subprocess_exec('git', '--version', stdout=PIPE))
except FileNotFoundError as e:
msg = '未找到 git, 请确保 git 已安装并在环境变量中\n安装步骤请参阅: https://git-scm.com/book/zh/v2/%E8%B5%B7%E6%AD%A5-%E5%AE%89%E8%A3%85-Git'
raise RuntimeError(msg) from e
async def check_repo(repo_path: Path) -> Status:
if not repo_path.exists():
return Status.NOT_EXIST
proc = await create_subprocess_exec(
'git', 'rev-parse', '--is-inside-work-tree', stdout=PIPE, stderr=PIPE, cwd=repo_path
)
await parse_log(proc)
if proc.returncode != 0:
return Status.NOT_INITIALIZATION
return Status.OK
async def clone_repo(repo_url: str, repo_path: Path, branch: str | None = None, depth: int | None = 1) -> bool:
args: list[str | Path] = ['git', 'clone', repo_url, repo_path]
if branch is not None:
args.extend(['-b', branch])
if depth is not None:
args.append(f'--depth={depth}')
proc = await create_subprocess_exec(*args, stdout=PIPE, stderr=PIPE)
await parse_log(proc)
return proc.returncode == 0
async def checkout(revision: str, repo_path: Path) -> bool:
proc = await create_subprocess_exec('git', 'checkout', revision, stdout=PIPE, stderr=PIPE, cwd=repo_path)
await parse_log(proc)
return proc.returncode == 0
async def init_templates() -> None:
await check_git()
status = await check_repo(templates_dir)
if status == Status.OK:
return
if status == Status.NOT_EXIST:
logger.info('模板仓库不存在, 正在尝试初始化...')
if status == Status.NOT_INITIALIZATION:
logger.warning('模板仓库状态异常, 尝试重新初始化')
rmtree(templates_dir)
if not await clone_repo(
repo_url='https://github.com/A-Minos/tetris-stats-templates', repo_path=templates_dir, branch='gh-pages'
):
msg = '模板仓库初始化失败'
raise RuntimeError(msg)
logger.success('模板仓库初始化成功')
async def update_templates(repo_path: Path) -> bool:
logger.info('开始更新模板仓库...')
logger.info('拉取最新提交')
proc = await create_subprocess_exec('git', 'fetch', '--all', '--tags', stdout=PIPE, stderr=PIPE, cwd=repo_path)
await parse_log(proc)
if proc.returncode != 0:
logger.error('拉取最新提交失败')
return False
logger.success('拉取最新提交成功')
async def check_hash(hash_file_path: Path) -> bool:
logger.info('开始校验模板哈希值')
for i in hash_file_path.read_text().splitlines():
file_sha256, file_relative_path = i.split(maxsplit=1)
file_path = hash_file_path.parent / file_relative_path
hasher = sha256()
if not file_path.is_file():
logger.error(f'{file_path.name} 不存在或不是文件')
return False
async with open(file_path, 'rb') as file:
while True:
chunk = await file.read(65535)
if not chunk:
break
hasher.update(chunk)
if hasher.hexdigest() != file_sha256:
logger.error(f'{file_path.name} hash 不匹配')
return False
logger.debug(f'{file_path.name} hash 匹配成功')
logger.success('模板哈希值校验成功')
return True
async def check_commit_hash(commit_hash: str, repo_path: Path, branch: str | None = None) -> bool:
output = await parse_log(
proc := await create_subprocess_exec(
'git', 'branch', '--contains', commit_hash, stdout=PIPE, stderr=PIPE, cwd=repo_path
)
)
return (
proc.returncode == 0
and len(output.stdout) > 0
and (branch is None or branch in output.stdout[0] or 'HEAD detached at' in output.stdout[0])
)
async def init_templates(tag: str) -> bool:
logger.info(f'开始初始化模板 {tag}')
temp_path = await unzip_templates(await download_templates(tag))
if not await check_hash(temp_path / 'hash.sha256'):
rmtree(temp_path)
return False
if TEMPLATES_DIR.exists():
logger.info('清除旧模板文件')
rmtree(TEMPLATES_DIR)
temp_path.rename(TEMPLATES_DIR)
logger.info('模板初始化完成')
return True
async def handle_tag(tag: str) -> str | None:
tags = (
await parse_log(await create_subprocess_exec('git', 'tag', stdout=PIPE, stderr=PIPE, cwd=templates_dir))
).stdout
if tag not in tags:
logger.debug(f'{tag} 不为 tag')
return None
logger.info(f'{tag} 为 tag, 正在尝试 checkout 到 tag 对应的 gh-pages commit')
tag_commit_hash = (
(
await parse_log(
await create_subprocess_exec(
'git', 'show-ref', '--tags', tag, stdout=PIPE, stderr=PIPE, cwd=templates_dir
)
)
)
.stdout[0]
.split(maxsplit=1)[0]
)
logger.success(f'tag 的 commit 为 {tag_commit_hash}')
commit_hash = (
await parse_log(
await create_subprocess_exec(
'git',
'log',
'gh-pages',
'--grep',
f'deploy: {tag_commit_hash}',
'--pretty=format:%H',
stdout=PIPE,
stderr=PIPE,
cwd=templates_dir,
)
)
).stdout[0]
logger.info(f'找到疑似的 gh-pages commit {commit_hash}')
if await check_commit_hash(commit_hash, templates_dir, branch='gh-pages'):
logger.success('验证成功')
return commit_hash
logger.error('验证失败')
return None
@alc.handle()
async def _(revision: str):
if not await update_templates(templates_dir):
msg = '模板仓库更新失败'
logger.error(msg)
await UniMessage(msg).finish()
commit_hash = await handle_tag(revision)
if commit_hash is not None:
if await checkout(commit_hash, templates_dir):
msg = f'模板成功 checkout 到 {commit_hash}'
logger.success(msg)
await alc.finish(msg)
else:
logger.error('checkout 失败')
await alc.finish('checkout 失败')
async def check_tag(tag: str) -> bool:
async with AsyncClient() as client:
return (
await client.get(f'https://github.com/A-Minos/tetris-stats-templates/releases/tag/{tag}')
).status_code != HTTPStatus.NOT_FOUND
@driver.on_startup
async def _():
await init_templates()
if (path := (TEMPLATES_DIR / 'hash.sha256')).is_file() and await check_hash(path):
logger.success('模板验证成功')
return
if not await init_templates('latest'):
msg = '模板初始化失败'
raise RuntimeError(msg)
@alc.handle()
async def _(revision: str | None = None):
if revision is not None and not await check_tag(revision):
await alc.finish(f'{revision} 不是模板仓库中的有效标签')
logger.info('开始更新模板')
if await init_templates(revision or 'latest'):
await alc.finish('更新模板成功')
await alc.finish('更新模板失败')

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = 'nonebot-plugin-tetris-stats'
version = '1.4.1'
version = '1.4.2'
description = '一款基于 NoneBot2 的用于查询 Tetris 相关游戏数据的插件'
authors = ['scdhh <wallfjjd@gmail.com>']
readme = 'README.md'