| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- """init auth models
- Revision ID: 20260425_0001
- Revises:
- Create Date: 2026-04-25 23:55:00
- """
- from collections.abc import Sequence
- import sqlalchemy as sa
- from alembic import op
- revision: str = "20260425_0001"
- down_revision: str | None = None
- branch_labels: Sequence[str] | None = None
- depends_on: Sequence[str] | None = None
- def upgrade() -> None:
- if not _has_table("auth_user"):
- op.create_table(
- "auth_user",
- sa.Column("username", sa.String(length=128), nullable=False),
- sa.Column("display_name", sa.String(length=128), nullable=True),
- sa.Column("email", sa.String(length=256), nullable=True),
- sa.Column("status", sa.String(length=32), nullable=False),
- sa.Column("metadata_json", sa.JSON(), nullable=False),
- sa.Column("last_login_time", sa.DateTime(), nullable=True),
- sa.Column("id", sa.String(length=36), nullable=False),
- sa.Column("created_by", sa.String(length=36), nullable=True),
- sa.Column("updated_by", sa.String(length=36), nullable=True),
- sa.Column("created_time", sa.DateTime(), nullable=False),
- sa.Column("updated_time", sa.DateTime(), nullable=False),
- sa.Column("deleted_time", sa.DateTime(), nullable=True),
- sa.Column("version", sa.Integer(), nullable=False),
- sa.PrimaryKeyConstraint("id"))
- if not _has_table("auth_role"):
- op.create_table(
- "auth_role",
- sa.Column("code", sa.String(length=128), nullable=False),
- sa.Column("name", sa.String(length=128), nullable=False),
- sa.Column("description", sa.Text(), nullable=True),
- sa.Column("status", sa.String(length=32), nullable=False),
- sa.Column("permissions_json", sa.JSON(), nullable=False),
- sa.Column("id", sa.String(length=36), nullable=False),
- sa.Column("created_by", sa.String(length=36), nullable=True),
- sa.Column("updated_by", sa.String(length=36), nullable=True),
- sa.Column("created_time", sa.DateTime(), nullable=False),
- sa.Column("updated_time", sa.DateTime(), nullable=False),
- sa.Column("deleted_time", sa.DateTime(), nullable=True),
- sa.Column("version", sa.Integer(), nullable=False),
- sa.PrimaryKeyConstraint("id"))
- if not _has_table("auth_role_assignment"):
- op.create_table(
- "auth_role_assignment",
- sa.Column("user_id", sa.String(length=36), nullable=False),
- sa.Column("role_id", sa.String(length=36), nullable=False),
- sa.Column("status", sa.String(length=32), nullable=False),
- sa.Column("scope_type", sa.String(length=64), nullable=True),
- sa.Column("scope_id", sa.String(length=64), nullable=True),
- sa.Column("expires_time", sa.DateTime(), nullable=True),
- sa.Column("id", sa.String(length=36), nullable=False),
- sa.Column("created_by", sa.String(length=36), nullable=True),
- sa.Column("updated_by", sa.String(length=36), nullable=True),
- sa.Column("created_time", sa.DateTime(), nullable=False),
- sa.Column("updated_time", sa.DateTime(), nullable=False),
- sa.Column("deleted_time", sa.DateTime(), nullable=True),
- sa.Column("version", sa.Integer(), nullable=False),
- sa.PrimaryKeyConstraint("id"))
- for table_name in ("auth_user", "auth_role", "auth_role_assignment"):
- _create_index_if_missing(f"ix_{table_name}_status", table_name, ["status"])
- _create_index_if_missing("ix_auth_user_username", "auth_user", ["username"])
- _create_index_if_missing("ix_auth_user_email", "auth_user", ["email"])
- _create_index_if_missing("ix_auth_role_code", "auth_role", ["code"])
- _create_index_if_missing("ix_auth_role_assignment_user_id", "auth_role_assignment", ["user_id"])
- _create_index_if_missing("ix_auth_role_assignment_role_id", "auth_role_assignment", ["role_id"])
- _create_index_if_missing(
- "ix_auth_role_assignment_scope_type",
- "auth_role_assignment",
- ["scope_type"])
- _create_index_if_missing(
- "ix_auth_role_assignment_scope_id",
- "auth_role_assignment",
- ["scope_id"])
- _create_index_if_missing(
- "ix_auth_role_assignment_expires_time",
- "auth_role_assignment",
- ["expires_time"])
- def downgrade() -> None:
- op.drop_table("auth_role_assignment")
- op.drop_table("auth_role")
- op.drop_table("auth_user")
- def _has_table(table_name: str) -> bool:
- return sa.inspect(op.get_bind()).has_table(table_name)
- def _create_index_if_missing(index_name: str, table_name: str, columns: list[str]) -> None:
- existing_index_names = {
- index["name"]
- for index in sa.inspect(op.get_bind()).get_indexes(table_name)
- }
- if index_name not in existing_index_names:
- op.create_index(index_name, table_name, columns)
|