食谱#
一系列“操作指南”,重点介绍扩展 Alembic 的常用方法。
注意
这是一个新部分,我们根据用户请求编制了各种“操作指南”。通常情况下,用户会请求一项功能,然后才了解到可以通过简单的自定义来提供该功能。
从头开始构建最新数据库#
有一种数据库迁移理论认为,数据库的现有版本应该能够从一个完全空白的架构转到最终产品,然后再返回。Alembic 可以这样滚动。尽管我们认为这有点矫枉过正,因为 SQLAlchemy 本身可以使用 create_all()
为任何给定的模型发出完整的 CREATE 语句。如果您签出应用程序的副本,运行此命令将一次性为您提供整个数据库,而无需运行所有那些迁移文件,而这些文件专门用于对现有数据库应用增量更改。
Alembic 可以非常轻松地与 create_all()
脚本集成。在运行创建操作后,告诉 Alembic 创建一个新版本表,并用最新版本(即 head
)对其进行标记
# inside of a "create the database" script, first create
# tables:
my_metadata.create_all(engine)
# then, load the Alembic configuration and generate the
# version table, "stamping" it with the most recent rev:
from alembic.config import Config
from alembic import command
alembic_cfg = Config("/path/to/yourapp/alembic.ini")
command.stamp(alembic_cfg, "head")
使用此方法时,应用程序可以使用常规 SQLAlchemy 技术生成数据库,而不是遍历数百个迁移脚本。现在,迁移脚本的目的仅限于在过时数据库上版本之间的移动,而不是新数据库。您现在可以删除不再在任何现有环境中表示的旧迁移文件。
要剪除旧迁移文件,只需删除文件即可。然后,在最早的、仍然保留的迁移文件中,将 down_revision
设置为 None
# replace this:
#down_revision = '290696571ad2'
# with this:
down_revision = None
该文件现在成为迁移系列的“基础”。
条件迁移元素#
此示例介绍了一个常见需求的基本思路,即根据命令行开关影响迁移的运行方式。
此处使用的技术很简单;在迁移脚本中,检查 EnvironmentContext.get_x_argument()
集合中是否有任何其他用户定义的参数。然后根据这些参数的存在采取措施。
为了使检查这些标志的逻辑易于使用和修改,我们修改 script.py.mako
模板,以便在所有新修订文件中提供此功能
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision}
Create Date: ${create_date}
"""
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
from alembic import context
def upgrade():
schema_upgrades()
if context.get_x_argument(as_dictionary=True).get('data', None):
data_upgrades()
def downgrade():
if context.get_x_argument(as_dictionary=True).get('data', None):
data_downgrades()
schema_downgrades()
def schema_upgrades():
"""schema upgrade migrations go here."""
${upgrades if upgrades else "pass"}
def schema_downgrades():
"""schema downgrade migrations go here."""
${downgrades if downgrades else "pass"}
def data_upgrades():
"""Add any optional data upgrade migrations here!"""
pass
def data_downgrades():
"""Add any optional data downgrade migrations here!"""
pass
现在,当我们创建新的迁移文件时,data_upgrades()
和 data_downgrades()
占位符将可用,我们可以在其中添加可选数据迁移
"""rev one
Revision ID: 3ba2b522d10d
Revises: None
Create Date: 2014-03-04 18:05:36.992867
"""
# revision identifiers, used by Alembic.
revision = '3ba2b522d10d'
down_revision = None
from alembic import op
import sqlalchemy as sa
from sqlalchemy import String, Column
from sqlalchemy.sql import table, column
from alembic import context
def upgrade():
schema_upgrades()
if context.get_x_argument(as_dictionary=True).get('data', None):
data_upgrades()
def downgrade():
if context.get_x_argument(as_dictionary=True).get('data', None):
data_downgrades()
schema_downgrades()
def schema_upgrades():
"""schema upgrade migrations go here."""
op.create_table("my_table", Column('data', String))
def schema_downgrades():
"""schema downgrade migrations go here."""
op.drop_table("my_table")
def data_upgrades():
"""Add any optional data upgrade migrations here!"""
my_table = table('my_table',
column('data', String),
)
op.bulk_insert(my_table,
[
{'data': 'data 1'},
{'data': 'data 2'},
{'data': 'data 3'},
]
)
def data_downgrades():
"""Add any optional data downgrade migrations here!"""
op.execute("delete from my_table")
要调用包含数据的迁移,我们使用 -x
标志
alembic -x data=true upgrade head
EnvironmentContext.get_x_argument()
是在环境和迁移脚本中支持新的命令行选项的简单方法。
可替换对象#
此配方提出了一种处理我们可能称之为可替换模式对象的方法。可替换对象是一个需要一次创建和删除的模式对象。此类对象的示例包括视图、存储过程和触发器。
另请参阅
可替换对象概念已由 Alembic Utils 项目集成,该项目为 PostgreSQL 函数和视图提供自动生成和迁移支持。请参阅 olirice/alembic_utils 中的 Alembic Utils。
可替换对象存在一个问题,即为了对其进行增量更改,我们必须一次引用整个定义。例如,如果我们需要向视图添加新列,则必须完全删除它,并使用添加的额外列重新创建它,引用整个结构;但是,为了让它更难,如果我们希望在迁移脚本中支持降级操作,我们需要完全引用该构造的先前版本,我们更愿意不必在多个地方输入整个定义。
此配方建议我们可以通过直接命名创建它的迁移版本来引用可替换构造的旧版本,并让迁移在迁移运行时引用该先前文件。我们还将演示如何在 Alembic 0.8 中引入的 操作插件 功能中集成此逻辑。首先查看此部分以获取此 API 的概述可能非常有帮助。
可替换对象结构#
我们首先需要设计一个简单的格式,表示我们正在构建的“CREATE XYZ”/“DROP XYZ”方面。我们将使用一个表示文本定义的对象;虽然 SQL 视图是一个我们可以使用 表元数据类系统 定义的对象,但对于存储过程等内容来说并非如此,因为我们几乎需要在某处写下完整的字符串定义。我们将使用一个名为 ReplaceableObject
的简单值对象,它可以表示任何命名的 SQL 文本集,以发送到某种“CREATE”语句
class ReplaceableObject:
def __init__(self, name, sqltext):
self.name = name
self.sqltext = sqltext
在迁移脚本中使用此对象(假设使用 Postgresql 样式语法),如下所示
customer_view = ReplaceableObject(
"customer_view",
"SELECT name, order_count FROM customer WHERE order_count > 0"
)
add_customer_sp = ReplaceableObject(
"add_customer_sp(name varchar, order_count integer)",
"""
RETURNS integer AS $$
BEGIN
insert into customer (name, order_count)
VALUES (in_name, in_order_count);
END;
$$ LANGUAGE plpgsql;
"""
)
ReplaceableObject
类只是执行此操作的一种非常简单的方法。我们表示架构对象的方式的结构对于本示例的目的来说并不重要;我们也可以将字符串放入元组或字典中,以及我们可以定义任何类型的字段和类结构。唯一重要的部分是,下面我们将说明如何组织可以消耗我们在此处创建的结构的代码。
目标对象的创建操作#
我们将使用 Operations
扩展 API 为创建、删除和替换视图和存储过程创建新操作。使用此 API 也是可选的;我们也可以创建任何类型的 Python 函数,以便从我们的迁移脚本中调用它。但是,使用此 API 可以非常好的将操作直接构建到 Alembic op.*
命名空间中。
最复杂的类如下。这是我们“可替换”操作的基础,它不仅包括在 ReplaceableObject
上发出 CREATE 和 DROP 指令的基本操作,还假定了一种“可逆性”模型,该模型利用对其他迁移文件的引用来引用对象的“先前”版本
from alembic.operations import Operations, MigrateOperation
class ReversibleOp(MigrateOperation):
def __init__(self, target):
self.target = target
@classmethod
def invoke_for_target(cls, operations, target):
op = cls(target)
return operations.invoke(op)
def reverse(self):
raise NotImplementedError()
@classmethod
def _get_object_from_version(cls, operations, ident):
version, objname = ident.split(".")
module = operations.get_context().script.get_revision(version).module
obj = getattr(module, objname)
return obj
@classmethod
def replace(cls, operations, target, replaces=None, replace_with=None):
if replaces:
old_obj = cls._get_object_from_version(operations, replaces)
drop_old = cls(old_obj).reverse()
create_new = cls(target)
elif replace_with:
old_obj = cls._get_object_from_version(operations, replace_with)
drop_old = cls(target).reverse()
create_new = cls(old_obj)
else:
raise TypeError("replaces or replace_with is required")
operations.invoke(drop_old)
operations.invoke(create_new)
当我们逐步完成示例时,这个类的作用应该会变得清晰。为了从这个基础创建可用的操作,我们将构建一系列存根类,并使用 Operations.register_operation()
使它们成为 op.*
命名空间的一部分
@Operations.register_operation("create_view", "invoke_for_target")
@Operations.register_operation("replace_view", "replace")
class CreateViewOp(ReversibleOp):
def reverse(self):
return DropViewOp(self.target)
@Operations.register_operation("drop_view", "invoke_for_target")
class DropViewOp(ReversibleOp):
def reverse(self):
return CreateViewOp(self.target)
@Operations.register_operation("create_sp", "invoke_for_target")
@Operations.register_operation("replace_sp", "replace")
class CreateSPOp(ReversibleOp):
def reverse(self):
return DropSPOp(self.target)
@Operations.register_operation("drop_sp", "invoke_for_target")
class DropSPOp(ReversibleOp):
def reverse(self):
return CreateSPOp(self.target)
要实际运行类似“CREATE VIEW”和“DROP SEQUENCE”的 SQL,我们将提供使用 Operations.implementation_for()
的实现,直接运行到 Operations.execute()
@Operations.implementation_for(CreateViewOp)
def create_view(operations, operation):
operations.execute("CREATE VIEW %s AS %s" % (
operation.target.name,
operation.target.sqltext
))
@Operations.implementation_for(DropViewOp)
def drop_view(operations, operation):
operations.execute("DROP VIEW %s" % operation.target.name)
@Operations.implementation_for(CreateSPOp)
def create_sp(operations, operation):
operations.execute(
"CREATE FUNCTION %s %s" % (
operation.target.name, operation.target.sqltext
)
)
@Operations.implementation_for(DropSPOp)
def drop_sp(operations, operation):
operations.execute("DROP FUNCTION %s" % operation.target.name)
以上所有代码都可以出现在应用程序源树中的任何位置;唯一的要求是当调用 env.py
脚本时,它包括最终调用这些类以及 Operations.register_operation()
和 Operations.implementation_for()
序列的导入。
创建初始迁移#
我们现在可以说明这些对象在使用期间的外观。对于第一步,我们将创建一个新迁移来创建一个“customer”表
$ alembic revision -m "create table"
我们构建第一个修订版如下
"""create table
Revision ID: 3ab8b2dfb055
Revises:
Create Date: 2015-07-27 16:22:44.918507
"""
# revision identifiers, used by Alembic.
revision = '3ab8b2dfb055'
down_revision = None
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
"customer",
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('name', sa.String),
sa.Column('order_count', sa.Integer),
)
def downgrade():
op.drop_table('customer')
对于第二个迁移,我们将创建一个视图和一个存储过程,对该表进行操作
$ alembic revision -m "create views/sp"
此迁移将使用新指令
"""create views/sp
Revision ID: 28af9800143f
Revises: 3ab8b2dfb055
Create Date: 2015-07-27 16:24:03.589867
"""
# revision identifiers, used by Alembic.
revision = '28af9800143f'
down_revision = '3ab8b2dfb055'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from foo import ReplaceableObject
customer_view = ReplaceableObject(
"customer_view",
"SELECT name, order_count FROM customer WHERE order_count > 0"
)
add_customer_sp = ReplaceableObject(
"add_customer_sp(name varchar, order_count integer)",
"""
RETURNS integer AS $$
BEGIN
insert into customer (name, order_count)
VALUES (in_name, in_order_count);
END;
$$ LANGUAGE plpgsql;
"""
)
def upgrade():
op.create_view(customer_view)
op.create_sp(add_customer_sp)
def downgrade():
op.drop_view(customer_view)
op.drop_sp(add_customer_sp)
我们看到了我们新的 create_view()
、create_sp()
、drop_view()
和 drop_sp()
指令的使用。将它们运行到“head”,我们得到以下内容(这包括对发出的 SQL 的编辑视图)
$ alembic upgrade 28af9800143
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [sqlalchemy.engine.base.Engine] BEGIN (implicit)
INFO [sqlalchemy.engine.base.Engine] select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
INFO [sqlalchemy.engine.base.Engine] {'name': u'alembic_version'}
INFO [sqlalchemy.engine.base.Engine] SELECT alembic_version.version_num
FROM alembic_version
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
INFO [sqlalchemy.engine.base.Engine] {'name': u'alembic_version'}
INFO [alembic.runtime.migration] Running upgrade -> 3ab8b2dfb055, create table
INFO [sqlalchemy.engine.base.Engine]
CREATE TABLE customer (
id SERIAL NOT NULL,
name VARCHAR,
order_count INTEGER,
PRIMARY KEY (id)
)
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] INSERT INTO alembic_version (version_num) VALUES ('3ab8b2dfb055')
INFO [sqlalchemy.engine.base.Engine] {}
INFO [alembic.runtime.migration] Running upgrade 3ab8b2dfb055 -> 28af9800143f, create views/sp
INFO [sqlalchemy.engine.base.Engine] CREATE VIEW customer_view AS SELECT name, order_count FROM customer WHERE order_count > 0
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] CREATE FUNCTION add_customer_sp(name varchar, order_count integer)
RETURNS integer AS $$
BEGIN
insert into customer (name, order_count)
VALUES (in_name, in_order_count);
END;
$$ LANGUAGE plpgsql;
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] UPDATE alembic_version SET version_num='28af9800143f' WHERE alembic_version.version_num = '3ab8b2dfb055'
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] COMMIT
我们看到我们的 CREATE TABLE 以及我们的新指令产生的 CREATE VIEW 和 CREATE FUNCTION 操作也已进行。
创建修订版迁移#
最后,我们可以说明如何“修订”这些对象。让我们考虑我们向 customer
表中添加了一个新列 email
$ alembic revision -m "add email col"
迁移是
"""add email col
Revision ID: 191a2d20b025
Revises: 28af9800143f
Create Date: 2015-07-27 16:25:59.277326
"""
# revision identifiers, used by Alembic.
revision = '191a2d20b025'
down_revision = '28af9800143f'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column("customer", sa.Column("email", sa.String()))
def downgrade():
op.drop_column("customer", "email")
我们现在需要重新创建customer_view
视图和add_customer_sp
函数。为了包含降级功能,我们需要引用该构造的先前版本;我们创建的replace_view()
和replace_sp()
操作通过允许我们引用特定的先前版本来实现这一点。 replaces
和replace_with
参数接受一个点分隔的字符串,该字符串引用版本号和对象名称,例如"28af9800143f.customer_view"
。 ReversibleOp
类利用Operations.get_context()
方法来查找我们引用的版本文件
$ alembic revision -m "update views/sp"
迁移
"""update views/sp
Revision ID: 199028bf9856
Revises: 191a2d20b025
Create Date: 2015-07-27 16:26:31.344504
"""
# revision identifiers, used by Alembic.
revision = '199028bf9856'
down_revision = '191a2d20b025'
branch_labels = None
depends_on = None
from alembic import op
import sqlalchemy as sa
from foo import ReplaceableObject
customer_view = ReplaceableObject(
"customer_view",
"SELECT name, order_count, email "
"FROM customer WHERE order_count > 0"
)
add_customer_sp = ReplaceableObject(
"add_customer_sp(name varchar, order_count integer, email varchar)",
"""
RETURNS integer AS $$
BEGIN
insert into customer (name, order_count, email)
VALUES (in_name, in_order_count, email);
END;
$$ LANGUAGE plpgsql;
"""
)
def upgrade():
op.replace_view(customer_view, replaces="28af9800143f.customer_view")
op.replace_sp(add_customer_sp, replaces="28af9800143f.add_customer_sp")
def downgrade():
op.replace_view(customer_view, replace_with="28af9800143f.customer_view")
op.replace_sp(add_customer_sp, replace_with="28af9800143f.add_customer_sp")
在上面,我们不再使用create_view()
、create_sp()
、drop_view()
和drop_sp()
方法,而是使用replace_view()
和replace_sp()
。我们构建的替换操作始终运行DROP和CREATE。运行升级到头,我们看到
$ alembic upgrade head
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [sqlalchemy.engine.base.Engine] BEGIN (implicit)
INFO [sqlalchemy.engine.base.Engine] select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
INFO [sqlalchemy.engine.base.Engine] {'name': u'alembic_version'}
INFO [sqlalchemy.engine.base.Engine] SELECT alembic_version.version_num
FROM alembic_version
INFO [sqlalchemy.engine.base.Engine] {}
INFO [alembic.runtime.migration] Running upgrade 28af9800143f -> 191a2d20b025, add email col
INFO [sqlalchemy.engine.base.Engine] ALTER TABLE customer ADD COLUMN email VARCHAR
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] UPDATE alembic_version SET version_num='191a2d20b025' WHERE alembic_version.version_num = '28af9800143f'
INFO [sqlalchemy.engine.base.Engine] {}
INFO [alembic.runtime.migration] Running upgrade 191a2d20b025 -> 199028bf9856, update views/sp
INFO [sqlalchemy.engine.base.Engine] DROP VIEW customer_view
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] CREATE VIEW customer_view AS SELECT name, order_count, email FROM customer WHERE order_count > 0
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] DROP FUNCTION add_customer_sp(name varchar, order_count integer)
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] CREATE FUNCTION add_customer_sp(name varchar, order_count integer, email varchar)
RETURNS integer AS $$
BEGIN
insert into customer (name, order_count, email)
VALUES (in_name, in_order_count, email);
END;
$$ LANGUAGE plpgsql;
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] UPDATE alembic_version SET version_num='199028bf9856' WHERE alembic_version.version_num = '191a2d20b025'
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] COMMIT
在添加了我们的新email
列之后,我们看到customer_view
和add_customer_sp()
在创建新版本之前都被删除了。如果我们降级回旧版本,我们会在这次迁移的降级中再次看到这些旧版本的重新创建
$ alembic downgrade 28af9800143
INFO [alembic.runtime.migration] Context impl PostgresqlImpl.
INFO [alembic.runtime.migration] Will assume transactional DDL.
INFO [sqlalchemy.engine.base.Engine] BEGIN (implicit)
INFO [sqlalchemy.engine.base.Engine] select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where pg_catalog.pg_table_is_visible(c.oid) and relname=%(name)s
INFO [sqlalchemy.engine.base.Engine] {'name': u'alembic_version'}
INFO [sqlalchemy.engine.base.Engine] SELECT alembic_version.version_num
FROM alembic_version
INFO [sqlalchemy.engine.base.Engine] {}
INFO [alembic.runtime.migration] Running downgrade 199028bf9856 -> 191a2d20b025, update views/sp
INFO [sqlalchemy.engine.base.Engine] DROP VIEW customer_view
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] CREATE VIEW customer_view AS SELECT name, order_count FROM customer WHERE order_count > 0
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] DROP FUNCTION add_customer_sp(name varchar, order_count integer, email varchar)
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] CREATE FUNCTION add_customer_sp(name varchar, order_count integer)
RETURNS integer AS $$
BEGIN
insert into customer (name, order_count)
VALUES (in_name, in_order_count);
END;
$$ LANGUAGE plpgsql;
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] UPDATE alembic_version SET version_num='191a2d20b025' WHERE alembic_version.version_num = '199028bf9856'
INFO [sqlalchemy.engine.base.Engine] {}
INFO [alembic.runtime.migration] Running downgrade 191a2d20b025 -> 28af9800143f, add email col
INFO [sqlalchemy.engine.base.Engine] ALTER TABLE customer DROP COLUMN email
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] UPDATE alembic_version SET version_num='28af9800143f' WHERE alembic_version.version_num = '191a2d20b025'
INFO [sqlalchemy.engine.base.Engine] {}
INFO [sqlalchemy.engine.base.Engine] COMMIT
PostgreSQL 数据库的基本模式级多租户#
多租户是指同时为多个客户端提供服务的应用程序。在数据库迁移工具的范围内,多租户通常是指维护多个相同数据库的做法,其中每个数据库都分配给一个客户端。
Alembic 目前没有明确的多租户支持;通常,该方法必须涉及针对不同的数据库 URL 多次运行 Alembic。
一种常见的多租户方法,尤其是在 PostgreSQL 数据库中,是在各个 PostgreSQL 架构中安装租户。在使用 PostgreSQL 的架构时,提供了一个特殊变量 search_path
,旨在帮助定位不同的架构。
注意
SQLAlchemy 包含一个系统,用于将一组常见的 Table
元数据定向到称为 schema_translate_map 的多个架构。在撰写本文时,Alembic 缺乏对该功能的充分支持。以下方法应被视为临时方法,直到 Alembic 对架构级多租户提供更多的一流支持。
以下方法可以根据灵活性进行更改。该方法的主要目的是说明如何将 Alembic 进程指向一个 PostgreSQL 架构或另一个架构。
用作自动生成目标的模型元数据不得包含任何表的架构名称;架构必须不存在或设置为
None
。否则,Alembic 自动生成仍将尝试根据该架构比较和呈现表class A(Base): __tablename__ = 'a' id = Column(Integer, primary_key=True) data = Column(UnicodeText()) foo = Column(Integer) __table_args__ = { "schema": None }
EnvironmentContext.configure.include_schemas
标志也必须为 False 或不包含。使用“-x”标志将“租户”作为架构名称传递给 Alembic。在
env.py
中,如下所示的方法允许-xtenant=some_schema
通过使用EnvironmentContext.get_x_argument()
来支持from sqlalchemy import text def run_migrations_online(): connectable = engine_from_config( config.get_section(config.config_ini_section), prefix="sqlalchemy.", poolclass=pool.NullPool, ) current_tenant = context.get_x_argument(as_dictionary=True).get("tenant") with connectable.connect() as connection: # set search path on the connection, which ensures that # PostgreSQL will emit all CREATE / ALTER / DROP statements # in terms of this schema by default connection.execute(text('set search_path to "%s"' % current_tenant)) # in SQLAlchemy v2+ the search path change needs to be committed connection.commit() # make use of non-supported SQLAlchemy attribute to ensure # the dialect reflects tables in terms of the current tenant name connection.dialect.default_schema_name = current_tenant context.configure( connection=connection, target_metadata=target_metadata, ) with context.begin_transaction(): context.run_migrations()
当前租户使用连接上的 PostgreSQL
search_path
变量进行设置。请注意,我们目前必须采用一个不受支持的 SQLAlchemy 解决方法,即硬编码 SQLAlchemy 方言的默认架构名称为我们的目标架构。同样重要的是要注意,上述更改将永久保留在连接上,除非明确撤销。如果 alembic 应用程序只是退出,则没有问题。但是,如果应用程序尝试继续将上述连接用于其他目的,则可能需要将这些变量重置为默认值,对于 PostgreSQL,通常是名称“public”,但可能根据配置而有所不同。
Alembic 操作现在将根据我们在命令行上传递的任何架构进行。所有记录的 SQL 不会显示任何架构,但反射操作除外,反射操作将使用
default_schema_name
属性[]$ alembic -x tenant=some_schema revision -m "rev1" --autogenerate
由于所有架构都应保持同步,因此应仅针对一个架构运行自动生成,生成新的 Alembic 迁移文件。然后针对所有架构运行自动生成的迁移操作。
使用自动生成时不要生成空迁移#
一个常见请求是,如果没有检测到架构的更改,则 alembic revision --autogenerate
命令实际上不会生成修订文件。使用 EnvironmentContext.configure.process_revision_directives
钩子,这是直接的;在 MigrationContext.configure()
中放置一个 process_revision_directives
钩子,如果 MigrationScript
指令为空,则将其删除
# for typing purposes
from collections.abc import Iterable
from alembic.environment import MigrationContext
# this typing-only import requires alembic 1.12.1 or above
from alembic.operations import MigrationScript
def run_migrations_online():
# ...
def process_revision_directives(
context: MigrationContext,
revision: str | Iterable[str | None] | Iterable[str],
directives: list[MigrationScript],
):
assert config.cmd_opts is not None
if getattr(config.cmd_opts, 'autogenerate', False):
script = directives[0]
assert script.upgrade_ops is not None
if script.upgrade_ops.is_empty():
directives[:] = []
# connectable = ...
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives
)
with context.begin_transaction():
context.run_migrations()
当表也将被删除时,不要发出 DROP INDEX#
当删除针对其上也具有外键约束的列的索引时,MySQL 可能会抱怨。如果无论如何都要删除该表,则 DROP INDEX 不是必需的。此方法将处理自动生成指令集,以便删除所有 DropIndexOp
指令,这些指令针对的表本身将被删除
def run_migrations_online():
# ...
from alembic.operations import ops
def process_revision_directives(context, revision, directives):
script = directives[0]
# process both "def upgrade()", "def downgrade()"
for directive in (script.upgrade_ops, script.downgrade_ops):
# make a set of tables that are being dropped within
# the migration function
tables_dropped = set()
for op in directive.ops:
if isinstance(op, ops.DropTableOp):
tables_dropped.add((op.table_name, op.schema))
# now rewrite the list of "ops" such that DropIndexOp
# is removed for those tables. Needs a recursive function.
directive.ops = list(
_filter_drop_indexes(directive.ops, tables_dropped)
)
def _filter_drop_indexes(directives, tables_dropped):
# given a set of (tablename, schemaname) to be dropped, filter
# out DropIndexOp from the list of directives and yield the result.
for directive in directives:
# ModifyTableOps is a container of ALTER TABLE types of
# commands. process those in place recursively.
if isinstance(directive, ops.ModifyTableOps) and \
(directive.table_name, directive.schema) in tables_dropped:
directive.ops = list(
_filter_drop_indexes(directive.ops, tables_dropped)
)
# if we emptied out the directives, then skip the
# container altogether.
if not directive.ops:
continue
elif isinstance(directive, ops.DropIndexOp) and \
(directive.table_name, directive.schema) in tables_dropped:
# we found a target DropIndexOp. keep looping
continue
# otherwise if not filtered, yield out the directive
yield directive
# connectable = ...
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
process_revision_directives=process_revision_directives
)
with context.begin_transaction():
context.run_migrations()
而自动生成在删除具有外键和索引的两个表时,以前会生成类似于
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_b_aid'), table_name='b')
op.drop_table('b')
op.drop_table('a')
# ### end Alembic commands ###
使用上述重写器,它生成如下
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('b')
op.drop_table('a')
# ### end Alembic commands ###
使用自动生成时,不要生成任何 DROP TABLE 指令#
在对存在于应用程序自动生成元数据之外的表的数据库运行自动生成时,可能希望防止自动生成考虑删除任何那些现有表。这将阻止自动生成检测从本地元数据中删除的表,但这也是一个很小的警告。
使用 EnvironmentContext.configure.include_object
钩子是实现此目的的最直接方式。无需硬编码表名的固定“白名单”;该钩子在给定参数中提供了足够的信息,以确定特定表名是否不是正在自动生成的本地 MetaData
的一部分,首先检查对象的类型是否为 "table"
,然后检查 reflected
是否为 True
,表示此表名来自本地数据库连接,而不是 MetaData
,最后检查 compare_to
是否为 None
,表示自动生成未将此 Table
与本地 MetaData
集合中的任何 Table
进行比较
# in env.py
def include_object(object, name, type_, reflected, compare_to):
if type_ == "table" and reflected and compare_to is None:
return False
else:
return True
context.configure(
# ...
include_object = include_object
)
在 CREATE TABLE 中对表列应用自定义排序#
此示例说明了在 使用重写器进行细粒度自动生成 中引入的 Rewriter
对象的使用。虽然重写器授予对各个 ops.MigrateOperation
对象的访问权限,但有时需要一些特殊技术来解决一些存在的结构限制。
一是尝试在一个 ops.CreateTableOp
指令中重新组织表中列的顺序时。当由自动生成生成时,此指令实际上保留了原始 Table
对象作为其信息源,因此尝试重新排序 ops.CreateTableOp.columns
集合通常不会产生任何效果。相反,可以构造一个新的 ops.CreateTableOp
对象,并使用新顺序。但是,第二个问题是内部的 Column
对象将已经与正在自动生成的模型中的 Table
相关联,这意味着它们不能直接重新分配给新的 Table
。为了解决这个问题,我们可以使用 Column.copy()
等方法复制所有列和约束。
下面我们使用 Rewriter
创建一个新的 ops.CreateTableOp
指令,并将 Column
对象从一个复制到另一个,复制每个列或约束对象并应用新的排序方案
# in env.py
from alembic.operations import ops
from alembic.autogenerate import rewriter
writer = rewriter.Rewriter()
@writer.rewrites(ops.CreateTableOp)
def order_columns(context, revision, op):
special_names = {"id": -100, "created_at": 1001, "updated_at": 1002}
cols_by_key = [
(
special_names.get(col.key, index)
if isinstance(col, Column)
else 2000,
col.copy(),
)
for index, col in enumerate(op.columns)
]
columns = [
col for idx, col in sorted(cols_by_key, key=lambda entry: entry[0])
]
return ops.CreateTableOp(
op.table_name, columns, schema=op.schema, **op.kw)
# ...
context.configure(
# ...
process_revision_directives=writer
)
上面,当我们将 writer
应用到诸如
Table(
"my_table",
m,
Column("data", String(50)),
Column("created_at", DateTime),
Column("id", Integer, primary_key=True),
Column("updated_at", DateTime),
UniqueConstraint("data", name="uq_data")
)
这将在自动生成的文件中呈现为
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"my_table",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("data", sa.String(length=50), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=True),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("data", name="uq_data"),
)
# ### end Alembic commands ###
不要为视图发出 CREATE TABLE 语句#
有时创建视图的 Table
实例很方便,以便可以使用常规 SQLAlchemy 技术对其进行查询。不幸的是,这会导致 Alembic 将它们视为需要创建的表,并生成虚假的 create_table()
操作。通过标记此类表并使用 include_object
挂钩将其排除,可以轻松修复此问题
my_view = Table('my_view', metadata, autoload=True, info=dict(is_view=True)) # Flag this as a view
或者,如果您使用声明性表
class MyView(Base):
__tablename__ = 'my_view'
__table_args__ = {'info': {'is_view': True}} # Flag this as a view
然后将 include_object
定义为
def include_object(object, name, type_, reflected, compare_to):
"""
Exclude views from Alembic's consideration.
"""
return not object.info.get('is_view', False)
最后,在 env.py
中将 include_object
作为关键字参数传递给 EnvironmentContext.configure()
。
从一个 .ini 文件运行多个 Alembic 环境#
早在 Alembic 具有 使用多个基准 中描述的“多个基准”功能之前,项目就需要在单个项目中维护多个 Alembic 版本历史记录,其中这些版本历史记录完全独立于彼此,并且每个版本历史记录都引用自己的 alembic_version 表,无论是在多个数据库、架构还是命名空间中。添加了一个简单的方法来支持此功能,即命令行上的 --name
标志。
首先,将创建此形式的 alembic.ini 文件
[DEFAULT]
# all defaults shared between environments go here
sqlalchemy.url = postgresql://scott:tiger@hostname/mydatabase
[schema1]
# path to env.py and migration scripts for schema1
script_location = myproject/revisions/schema1
[schema2]
# path to env.py and migration scripts for schema2
script_location = myproject/revisions/schema2
[schema3]
# path to env.py and migration scripts for schema3
script_location = myproject/revisions/db2
# this schema uses a different database URL as well
sqlalchemy.url = postgresql://scott:tiger@hostname/myotherdatabase
在上面的 [DEFAULT]
部分中,我们设置了默认数据库 URL。然后,我们创建三个部分,对应于项目中不同的修订谱系。这些目录中的每一个都将有自己的 env.py
和一组版本控制文件。然后,当我们运行 alembic
命令时,我们只需向其提供我们想要使用的配置的名称
alembic --name schema2 revision -m "new rev for schema 2" --autogenerate
在上面,alembic
命令使用 [schema2]
中的配置,该配置使用 [DEFAULT]
部分中的默认值填充。
还可以通过创建 Alembic 命令行的自定义前端来自动执行上述方法。
打印 Python 代码以生成特定数据库表#
假设您已经有一个数据库,并且想要生成一些 op.create_table()
和其他指令,这些指令您会在迁移文件中看到。我们如何自动生成该代码?假设数据库架构如下(假设为 MySQL)
CREATE TABLE IF NOT EXISTS `users` (
`id` int(11) NOT NULL,
KEY `id` (`id`)
);
CREATE TABLE IF NOT EXISTS `user_properties` (
`users_id` int(11) NOT NULL,
`property_name` varchar(255) NOT NULL,
`property_value` mediumtext NOT NULL,
UNIQUE KEY `property_name_users_id` (`property_name`,`users_id`),
KEY `users_id` (`users_id`),
CONSTRAINT `user_properties_ibfk_1` FOREIGN KEY (`users_id`)
REFERENCES `users` (`id`) ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
使用 ops.UpgradeOps
、ops.CreateTableOp
和 ops.CreateIndexOp
,我们创建一个迁移文件结构,使用从 SQLAlchemy 反射中获得的 Table
对象。该结构被传递给 autogenerate.render_python_code()
以生成用于迁移文件的 Python 代码
from sqlalchemy import create_engine
from sqlalchemy import MetaData, Table
from alembic import autogenerate
from alembic.operations import ops
e = create_engine("mysql://scott:tiger@localhost/test")
with e.connect() as conn:
m = MetaData()
user_table = Table('users', m, autoload_with=conn)
user_property_table = Table('user_properties', m, autoload_with=conn)
print(autogenerate.render_python_code(
ops.UpgradeOps(
ops=[
ops.CreateTableOp.from_table(table) for table in m.tables.values()
] + [
ops.CreateIndexOp.from_index(idx) for table in m.tables.values()
for idx in table.indexes
]
))
)
输出
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('users',
sa.Column('id', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False),
mysql_default_charset='latin1',
mysql_engine='InnoDB'
)
op.create_table('user_properties',
sa.Column('users_id', mysql.INTEGER(display_width=11), autoincrement=False, nullable=False),
sa.Column('property_name', mysql.VARCHAR(length=255), nullable=False),
sa.Column('property_value', mysql.MEDIUMTEXT(), nullable=False),
sa.ForeignKeyConstraint(['users_id'], ['users.id'], name='user_properties_ibfk_1', ondelete='CASCADE'),
mysql_comment='user properties',
mysql_default_charset='utf8',
mysql_engine='InnoDB'
)
op.create_index('id', 'users', ['id'], unique=False)
op.create_index('users_id', 'user_properties', ['users_id'], unique=False)
op.create_index('property_name_users_id', 'user_properties', ['property_name', 'users_id'], unique=True)
# ### end Alembic commands ###
直接运行 Alembic 操作对象(如在 autogenerate 中)#
Operations
对象有一个称为 Operations.invoke()
的方法,该方法将通用地调用特定操作对象。因此,我们可以使用 autogenerate.produce_migrations()
函数来运行自动生成比较,取回表示更改的 ops.MigrationScript
结构,并且通过一些内部信息,我们可以直接调用它们。
遍历 ops.MigrationScript
结构如下
use_batch = engine.name == "sqlite"
stack = [migrations.upgrade_ops]
while stack:
elem = stack.pop(0)
if use_batch and isinstance(elem, ModifyTableOps):
with operations.batch_alter_table(
elem.table_name, schema=elem.schema
) as batch_ops:
for table_elem in elem.ops:
batch_ops.invoke(table_elem)
elif hasattr(elem, "ops"):
stack.extend(elem.ops)
else:
operations.invoke(elem)
上面,我们通过查找 .ops
属性来检测具有操作集合的元素。检查 ModifyTableOps
允许我们在支持该操作时使用批处理上下文。
下面是一个完整的示例。此处的整体设置从 autogenerate.compare_metadata()
中的示例复制而来
from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
from alembic.autogenerate import produce_migrations
from alembic.migration import MigrationContext
from alembic.operations import Operations
from alembic.operations.ops import ModifyTableOps
engine = create_engine("sqlite://", echo=True)
with engine.connect() as conn:
conn.execute(
"""
create table foo (
id integer not null primary key,
old_data varchar(50),
x integer
)"""
)
conn.execute(
"""
create table bar (
data varchar(50)
)"""
)
metadata = MetaData()
Table(
"foo",
metadata,
Column("id", Integer, primary_key=True),
Column("data", Integer),
Column("x", Integer, nullable=False),
)
Table("bat", metadata, Column("info", String(100)))
mc = MigrationContext.configure(engine.connect())
migrations = produce_migrations(mc, metadata)
operations = Operations(mc)
use_batch = engine.name == "sqlite"
stack = [migrations.upgrade_ops]
while stack:
elem = stack.pop(0)
if use_batch and isinstance(elem, ModifyTableOps):
with operations.batch_alter_table(
elem.table_name, schema=elem.schema
) as batch_ops:
for table_elem in elem.ops:
batch_ops.invoke(table_elem)
elif hasattr(elem, "ops"):
stack.extend(elem.ops)
else:
operations.invoke(elem)
测试当前数据库版本是否在头#
确定数据库架构是否是最新的(就应用 Alembic 迁移而言)的秘诀。对于测试或安装套件,它可能很有用,以确定目标数据库是否是最新的。利用 MigrationContext.get_current_heads()
和 ScriptDirectory.get_heads()
方法,以便它能适应分支版本树
from alembic import config, script
from alembic.runtime import migration
from sqlalchemy import engine
def check_current_head(alembic_cfg, connectable):
# type: (config.Config, engine.Engine) -> bool
directory = script.ScriptDirectory.from_config(alembic_cfg)
with connectable.begin() as connection:
context = migration.MigrationContext.configure(connection)
return set(context.get_current_heads()) == set(directory.get_heads())
e = engine.create_engine("mysql://scott:tiger@localhost/test", echo=True)
cfg = config.Config("alembic.ini")
print(check_current_head(cfg, e))
将 Asyncio 与 Alembic 一起使用#
SQLAlchemy 1.4 版本引入了对 asyncio 的实验性支持,允许从异步应用程序使用其大部分界面。目前,Alembic 不直接提供异步 API,但它可以使用 SQLAlchemy 异步引擎来运行迁移和自动生成。
新配置可以使用模板“async”来引导一个环境,该环境可以与异步 DBAPI(如 asyncpg)一起使用,运行命令
alembic init -t async <script_directory_here>
可以通过更新 Alembic 用于启动其操作的 env.py
文件来更新现有配置以使用异步 DBAPI。特别是,只有 run_migrations_online
需要更新为如下面的示例
import asyncio
from sqlalchemy.ext.asyncio import async_engine_from_config
# ... no change required to the rest of the code
def do_run_migrations(connection):
context.configure(connection=connection, target_metadata=target_metadata)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations():
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online():
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
异步应用程序还可以通过使用 SQLAlchemy run_sync
方法来直接与 Alembic API 交互,以将 Alembic 的非异步 API 调整为异步使用者。
使用 Asyncio 的编程 API 用法(连接共享)#
将 在一条或多条编程迁移命令中共享连接 和 在 Alembic 中使用 Asyncio 的示例结合起来,上面列出的 env.py
可以按如下方式更新
def run_migrations_online():
"""Run migrations in 'online' mode.
"""
connectable = config.attributes.get("connection", None)
if connectable is None:
asyncio.run(run_async_migrations())
else:
do_run_migrations(connectable)
在 alembic.ini
中使用 asyncio 数据库 URL 时,可以从命令行运行 alembic upgrade
等命令。通过编程,可以使用 asyncio 调用相同的 env.py
文件,如下所示
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine
from alembic import command, config
def run_upgrade(connection, cfg):
cfg.attributes["connection"] = connection
command.upgrade(cfg, "head")
async def run_async_upgrade():
async_engine = create_async_engine("sqlite+aiosqlite://", echo=True)
async with async_engine.begin() as conn:
await conn.run_sync(run_upgrade, config.Config("alembic.ini"))
asyncio.run(run_async_upgrade())
数据迁移 - 一般技术#
Alembic 迁移专为架构迁移而设计。数据迁移的性质本质上是不同的,事实上,不建议编写与 Alembic 的架构版本控制模型集成的迁移。例如,降级很难解决,因为它们可能需要删除数据,甚至可能无法检测到数据。
警告
该解决方案需要针对每个单独的应用程序和迁移专门设计。没有通用的规则,以下文本仅基于经验的建议。
数据迁移有三种基本方法。
小数据#
小数据迁移很容易执行,尤其是在向新表中添加初始数据的情况下。可以使用 Operations.bulk_insert()
来处理这些数据。
单独的迁移脚本#
一种可能性是完全独立于 alembic 迁移的脚本。然后按照以下步骤处理完整的迁移
运行初始 alembic 迁移(新列等)
运行单独的数据迁移脚本
运行最终的 alembic 迁移(数据库约束、删除列等)
数据迁移脚本可能还需要一个单独的 ORM 模型来处理数据库的中间状态。
在线迁移#
应用程序维护两个版本的架构版本。在两个位置执行写入,而后台脚本会将所有剩余数据移到其他位置。这种技术非常具有挑战性且耗时,因为它需要自定义应用程序逻辑来处理中间状态。