-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmigrator.py
265 lines (219 loc) · 10.3 KB
/
migrator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
# coding: utf-8
"""Simple declarative schema migration for SQLite.
See <https://david.rothlis.net/declarative-schema-migration-for-sqlite>.
Author: William Manley <[email protected]>.
Copyright © 2019-2022 Stb-tester.com Ltd.
License: MIT.
"""
import logging
import re
import sqlite3
from textwrap import dedent
def dumb_migrate_db(db, schema, allow_deletions=False):
"""
Migrates a database to the new schema given by the SQL text `schema`
preserving the data. We create any table that exists in schema, delete any
old table that is no longer used and add/remove columns and indices as
necessary.
Under this scheme there are a set of changes that we can make to the schema
and this script will handle it fine:
1. Adding a new table
2. Adding, deleting or modifying an index
3. Adding a column to an existing table as long as the new column can be
NULL or has a DEFAULT value specified.
4. Changing a column to remove NULL or DEFAULT as long as all values in the
database are not NULL
5. Changing the type of a column
6. Changing the user_version
In addition this function is capable of:
1. Deleting tables
2. Deleting columns from tables
But only if allow_deletions=True. If the new schema requires a column/table
to be deleted and allow_deletions=False this function will raise
`RuntimeError`.
Note: When this function is called a transaction must not be held open on
db. A transaction will be used internally. If you wish to perform
additional migration steps as part of a migration use DBMigrator directly.
Any internally generated rowid columns by SQLite may change values by this
migration.
"""
with DBMigrator(db, schema, allow_deletions) as migrator:
migrator.migrate()
return bool(migrator.n_changes)
class DBMigrator:
def __init__(self, db, schema, allow_deletions=False):
self.db = db
self.schema = schema
self.allow_deletions = allow_deletions
self.pristine = sqlite3.connect(":memory:")
self.pristine.executescript(schema)
self.n_changes = 0
self.orig_foreign_keys = None
def log_execute(self, msg, sql, args=None):
# It's important to log any changes we're making to the database for
# forensics later
msg_tmpl = "Database migration: %s with SQL:\n%s"
msg_argv = (msg, _left_pad(dedent(sql)))
if args:
msg_tmpl += " args = %r"
msg_argv += (args,)
else:
args = []
logging.info(msg_tmpl, *msg_argv)
self.db.execute(sql, args)
self.n_changes += 1
def __enter__(self):
self.orig_foreign_keys = (
self.db.execute("PRAGMA foreign_keys").fetchone()[0])
if self.orig_foreign_keys:
self.log_execute("Disable foreign keys temporarily for migration",
"PRAGMA foreign_keys = OFF")
# This doesn't count as a change because we'll undo it at the end
self.n_changes = 0
self.db.__enter__()
self.db.execute('BEGIN')
return self
def __exit__(self, exc_type, exc_value, exc_tb):
self.db.__exit__(exc_type, exc_value, exc_tb)
if exc_value is None:
# The SQLite docs say:
#
# > This pragma is a no-op within a transaction; foreign key
# > constraint enforcement may only be enabled or disabled when
# > there is no pending BEGIN or SAVEPOINT.
old_changes = self.n_changes
new_val = self._migrate_pragma('foreign_keys')
if new_val == self.orig_foreign_keys:
self.n_changes = old_changes
# SQLite docs say:
#
# > A VACUUM will fail if there is an open transaction on the database
# > connection that is attempting to run the VACUUM.
if self.n_changes:
self.db.execute("VACUUM")
else:
if self.orig_foreign_keys:
self.log_execute(
"Re-enable foreign keys after migration",
"PRAGMA foreign_keys = ON")
def migrate(self):
# In CI the database schema may be changing all the time. This checks
# the current db and if it doesn't match database.sql we will
# modify it so it does match where possible.
pristine_tables = dict(self.pristine.execute("""\
SELECT name, sql FROM sqlite_master
WHERE type = \"table\" AND name != \"sqlite_sequence\"""").fetchall())
pristine_indices = dict(self.pristine.execute("""\
SELECT name, sql FROM sqlite_master
WHERE type = \"index\"""").fetchall())
tables = dict(self.db.execute("""\
SELECT name, sql FROM sqlite_master
WHERE type = \"table\" AND name != \"sqlite_sequence\"""").fetchall())
new_tables = set(pristine_tables.keys()) - set(tables.keys())
removed_tables = set(tables.keys()) - set(pristine_tables.keys())
if removed_tables and not self.allow_deletions:
raise RuntimeError(
"Database migration: Refusing to delete tables %r" %
removed_tables)
modified_tables = set(
name for name, sql in pristine_tables.items()
if normalise_sql(tables.get(name, "")) != normalise_sql(sql))
# This PRAGMA is automatically disabled when the db is committed
self.db.execute("PRAGMA defer_foreign_keys = TRUE")
# New and removed tables are easy:
for tbl_name in new_tables:
self.log_execute("Create table %s" % tbl_name,
pristine_tables[tbl_name])
for tbl_name in removed_tables:
self.log_execute("Drop table %s" % tbl_name,
"DROP TABLE %s" % tbl_name)
for tbl_name in modified_tables:
# The SQLite documentation insists that we create the new table and
# rename it over the old rather than moving the old out of the way
# and then creating the new
create_table_sql = pristine_tables[tbl_name]
create_table_sql = re.sub(r"\b%s\b" % re.escape(tbl_name),
tbl_name + "_migration_new",
create_table_sql)
self.log_execute(
"Columns change: Create table %s with updated schema" %
tbl_name, create_table_sql)
cols = set([
x[1] for x in self.db.execute(
"PRAGMA table_info(%s)" % tbl_name)])
pristine_cols = set([
x[1] for x in
self.pristine.execute("PRAGMA table_info(%s)" % tbl_name)])
removed_columns = cols - pristine_cols
if not self.allow_deletions and removed_columns:
logging.warning(
"Database migration: Refusing to remove columns %r from "
"table %s. Current cols are %r attempting migration to %r",
removed_columns, tbl_name, cols, pristine_cols)
raise RuntimeError(
"Database migration: Refusing to remove columns %r from "
"table %s" % (removed_columns, tbl_name))
logging.info("cols: %s, pristine_cols: %s", cols, pristine_cols)
self.log_execute(
"Migrate data for table %s" % tbl_name, """\
INSERT INTO {tbl_name}_migration_new ({common})
SELECT {common} FROM {tbl_name}""".format(
tbl_name=tbl_name,
common=", ".join(cols.intersection(pristine_cols))))
# Don't need the old table any more
self.log_execute(
"Drop old table %s now data has been migrated" % tbl_name,
"DROP TABLE %s" % tbl_name)
self.log_execute(
"Columns change: Move new table %s over old" % tbl_name,
"ALTER TABLE %s_migration_new RENAME TO %s" % (
tbl_name, tbl_name))
# Migrate the indices
indices = dict(self.db.execute("""\
SELECT name, sql FROM sqlite_master
WHERE type = \"index\"""").fetchall())
for name in set(indices.keys()) - set(pristine_indices.keys()):
self.log_execute("Dropping obsolete index %s" % name,
"DROP INDEX %s" % name)
for name, sql in pristine_indices.items():
if name not in indices:
self.log_execute("Creating new index %s" % name, sql)
elif sql != indices[name]:
self.log_execute(
"Index %s changed: Dropping old version" % name,
"DROP INDEX %s" % name)
self.log_execute(
"Index %s changed: Creating updated version in its place" %
name, sql)
self._migrate_pragma('user_version')
if self.pristine.execute("PRAGMA foreign_keys").fetchone()[0]:
if self.db.execute("PRAGMA foreign_key_check").fetchall():
raise RuntimeError(
"Database migration: Would fail foreign_key_check")
def _migrate_pragma(self, pragma):
pristine_val = self.pristine.execute(
"PRAGMA %s" % pragma).fetchone()[0]
val = self.db.execute("PRAGMA %s" % pragma).fetchone()[0]
if val != pristine_val:
self.log_execute(
"Set %s to %i from %i" % (pragma, pristine_val, val),
"PRAGMA %s = %i" % (pragma, pristine_val))
return pristine_val
def _left_pad(text, indent=" "):
"""Maybe I can find a package in pypi for this?"""
return "\n".join(indent + line for line in text.split('\n'))
def normalise_sql(sql):
# Remove comments:
sql = re.sub(r'--[^\n]*\n', "", sql)
# Normalise whitespace:
sql = re.sub(r'\s+', " ", sql)
sql = re.sub(r" *([(),]) *", r"\1", sql)
# Remove unnecessary quotes
sql = re.sub(r'"(\w+)"', r"\1", sql)
return sql.strip()
def test_normalise_sql():
assert normalise_sql("""\
CREATE TABLE "Node"( -- This is my table
-- There are many like it but this one is mine
A b, C D, "E F G", h)""") == \
'CREATE TABLE Node(A b,C D,"E F G",h)'