Skip to content

Commit 99a0ecd

Browse files
committed
Add Safe Ecto Migration guides
1 parent 383561d commit 99a0ecd

6 files changed

Lines changed: 1848 additions & 7 deletions

File tree

guides/backfilling_data.md

Lines changed: 365 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,365 @@
1+
# Backfilling Data
2+
3+
When I say "backfilling data", I mean that as any attempt to change data in bulk. This can happen in code through migrations, application code, UIs that allow multiple selections and updates, or in a console connected to a running application. Since bulk changes affect a lot of data, it's always a good idea to have the code reviewed before it runs. You also want to check that it runs efficiently and does not overwhelm the database. Ideally, it's nice when the code is written to be safe to re-run. For these reasons, please don't change data in bulk through a console!
4+
5+
We're going to focus on bulk changes executed though Ecto migrations, but the same principles are applicable to any case where bulk changes are being made. Typical scenarios where you might need to run data migrations is when you need to fill in data for records that already exist (hence, backfilling data). This usually comes up when table structures are changed in the database.
6+
7+
Some examples of backfilling:
8+
9+
- Populating data into a new column
10+
- Changing a column to make it required. May require changing existing rows to set a value.
11+
- Splitting one database table into several
12+
- Fixing bad data
13+
14+
For simplicity, we are using `Ecto.Migrator` to run our data migrations, but it's important to not let these migrations break developers' environments over time (more on this below). If using migrations to change data is a normal process that happens regularly, then you may consider exploring a migration system outside of `Ecto.Migrator` that is observable, hooks into error reporting, metrics, and allows for dry runs. This guide is intended as a starting point, and since Ecto ships with a migration runner, we'll leverage it to also run the data migrations.
15+
16+
There are both bad and good ways to write these data migrations. Let explore some:
17+
18+
## Bad
19+
20+
In the following example, a migration references the schema `MyApp.MySchema`.
21+
22+
```elixir
23+
defmodule MyApp.Repo.DataMigrations.BackfillPosts do
24+
use Ecto.Migration
25+
import Ecto.Query
26+
27+
def change do
28+
alter table("posts") do
29+
add :new_data, :text
30+
end
31+
32+
flush()
33+
34+
MyApp.MySchema
35+
|> where(new_data: nil)
36+
|> MyApp.Repo.update_all(set: [new_data: "some data"])
37+
end
38+
end
39+
```
40+
41+
The problem is the code and schema may change over time. However, migrations are using a snapshot of your schemas at the time it's written. In the future, many assumptions may no longer be true. For example, the new_data column may not be present anymore in the schema causing the query to fail if this migration is run months later.
42+
43+
Additionally, in your development environment, you might have 10 records to migrate; in staging, you might have 100; in production, you might have 1 billion to migrate. Scaling your approach matters.
44+
45+
Ultimately, there are several bad practices here:
46+
47+
1. The Ecto schema in the query may change after this migration was written.
48+
1. If you try to backfill the data all at once, it may exhaust the database memory and/or CPU if it's changing a large data set.
49+
1. Backfilling data inside a transaction for the migration locks row updates for the duration of the migration, even if you are updating in batches.
50+
1. Disabling the transaction for the migration and only batching updates may still spike the database CPU to 100%, causing other concurrent reads or writes to time out.
51+
52+
## Good
53+
54+
There are four keys to backfilling safely:
55+
56+
1. running outside a transaction
57+
1. batching
58+
1. throttling
59+
1. resiliency
60+
61+
As we've learned in this guide, it's straight-forward to disable the migration transactions. Add these options to the migration:
62+
63+
```elixir
64+
@disable_ddl_transaction true
65+
@disable_migration_lock true
66+
```
67+
68+
Batching our data migrations still has several challenges.
69+
70+
We'll start with how do we paginate efficiently: `LIMIT`/`OFFSET` by itself is an expensive query for large tables (they start fast, but slow to a crawl when in the later pages of the table), so we must find another way to paginate. Since we cannot use a database transaction, this also implies we cannot leverage cursors since they require a transaction. This leaves us with [keyset pagination](https://www.citusdata.com/blog/2016/03/30/five-ways-to-paginate/).
71+
72+
For querying and updating the data, there are two ways to "snapshot" your schema at the time of the migration. We'll use both options below in the examples:
73+
74+
1. Execute raw SQL that represents the table at that moment. Do not use Ecto schemas. Prefer this approach when you can. Your application's Ecto schemas will change over time, but your migration should not, therefore it's not a true snapshot of the data at the time.
75+
1. Write a small Ecto schema module inside the migration that only uses what you need. Then use that in your data migration. This is helpful if you prefer the Ecto API and decouples from your application's Ecto schemas as it evolves separately.
76+
77+
For throttling, we can simply add a `Process.sleep(@throttle)` for each page.
78+
79+
For resiliency, we need to ensure that we handle errors without losing our progress. You don't want to migrate the same data twice! Most data migrations I have run find some records in a state that I wasn't expecting. This causes the data migration to fail. When the data migration stops, that means I have to write a little bit more code to handle that scenario, and re-run the migration. Every time the data migration is re-run, it should pick up where it left off without revisiting already-migrated records.
80+
81+
Finally, to manage these data migrations separately, we need to:
82+
83+
1. Store data migrations separately from your schema migrations.
84+
1. Run the data migrations manually.
85+
86+
To achieve this, be inspired by [Ecto's documentation on creating a Release module](`Ecto.Migrator`), and extend your release module to allow options to pass into `Ecto.Migrator` that specifies the version to migrate and the data migrations' file path, for example:
87+
88+
```elixir
89+
defmodule MyApp.Release do
90+
# ...
91+
@doc """
92+
Migrate data in the database. Defaults to migrating to the latest, `[all: true]`
93+
Also accepts `[step: 1]`, or `[to: 20200118045751]`
94+
"""
95+
def migrate_data(opts \\ [all: true]) do
96+
for repo <- repos() do
97+
path = Ecto.Migrator.migrations_path(repo, "data_migrations")
98+
{:ok, _, _} = Ecto.Migrator.with_repo(repo, &Ecto.Migrator.run(&1, path, :up, opts))
99+
end
100+
end
101+
end
102+
```
103+
104+
## Batching Deterministic Data
105+
106+
If the data can be queried with a condition that is removed after update then you can repeatedly query the data and update the data until the query result is empty. For example, if a column is currently null and will be updated to not be null, then you can query for the null records and pick up where you left off.
107+
108+
Here's how we can manage the backfill:
109+
110+
1. Disable migration transactions.
111+
1. Use keyset pagination: Order the data, find rows greater than the last mutated row and limit by batch size.
112+
1. For each page, mutate the records.
113+
1. Check for failed updates and handle it appropriately.
114+
1. Use the last mutated record's ID as the starting point for the next page. This helps with resiliency and prevents looping on the same record over and over again.
115+
1. Arbitrarily sleep to throttle and prevent exhausting the database.
116+
1. Rinse and repeat until there are no more records
117+
118+
For example:
119+
120+
```bash
121+
mix ecto.gen.migration --migrations-path=priv/repo/data_migrations backfill_posts
122+
```
123+
124+
And modify the migration:
125+
126+
```elixir
127+
defmodule MyApp.Repo.DataMigrations.BackfillPosts do
128+
use Ecto.Migration
129+
import Ecto.Query
130+
131+
@disable_ddl_transaction true
132+
@disable_migration_lock true
133+
@batch_size 1000
134+
@throttle_ms 100
135+
136+
def up do
137+
throttle_change_in_batches(&page_query/1, &do_change/1)
138+
end
139+
140+
def down, do: :ok
141+
142+
def do_change(batch_of_ids) do
143+
{_updated, results} = repo().update_all(
144+
from(r in "weather", select: r.id, where: r.id in ^batch_of_ids),
145+
[set: [approved: true]],
146+
log: :info
147+
)
148+
not_updated = MapSet.difference(MapSet.new(batch_of_ids), MapSet.new(results)) |> MapSet.to_list()
149+
Enum.each(not_updated, &handle_non_update/1)
150+
Enum.sort(results)
151+
end
152+
153+
def page_query(last_id) do
154+
# Notice how we do not use Ecto schemas here.
155+
from(
156+
r in "weather",
157+
select: r.id,
158+
where: is_nil(r.approved) and r.id > ^last_id,
159+
order_by: [asc: r.id],
160+
limit: @batch_size
161+
)
162+
end
163+
164+
# If you have integer IDs, default last_pos = 0
165+
# If you have binary IDs, default last_pos = "00000000-0000-0000-0000-000000000000"
166+
defp throttle_change_in_batches(query_fun, change_fun, last_pos \\ 0)
167+
defp throttle_change_in_batches(_query_fun, _change_fun, nil), do: :ok
168+
defp throttle_change_in_batches(query_fun, change_fun, last_pos) do
169+
case repo().all(query_fun.(last_pos), [log: :info, timeout: :infinity]) do
170+
[] ->
171+
:ok
172+
173+
ids ->
174+
results = change_fun.(List.flatten(ids))
175+
next_page = List.first(results)
176+
Process.sleep(@throttle_ms)
177+
throttle_change_in_batches(query_fun, change_fun, next_page)
178+
end
179+
end
180+
181+
defp handle_non_update(id) do
182+
raise "#{inspect(id)} was not updated"
183+
end
184+
end
185+
```
186+
187+
## Batching Arbitrary Data
188+
189+
If the data being updated does not indicate it's already been updated, then we need to take a snapshot of the current data and store it temporarily. For example, if all rows should increment a column's value by 10, how would you know if a record was already updated? You could load a list of IDs into the application during the migration, but what if the process crashes? Instead we're going to keep the data we need in the database.
190+
191+
To do this, it works well if we can pick a specific point in time where all records _after_ that point in time do not need adjustment. This happens when you realize a bug was creating bad data and after the bug was fixed and deployed, all new entries are good and should not be touched as we clean up the bad data. For this example, we'll use `inserted_at` as our marker. Let's say that the bug was fixed on a midnight deploy on 2021-08-22.
192+
193+
Here's how we'll manage the backfill:
194+
195+
1. Create a "temporary" table. In this example, we're creating a real table that we'll drop at the end of the data migration. In Postgres, there are [actual temporary tables](https://www.postgresql.org/docs/12/sql-createtable.html) that are discarded after the session is over; we're not using those because we need resiliency in case the data migration encounters an error. The error would cause the session to be over, and therefore the temporary table tracking progress would be lost. Real tables don't have this problem. Likewise, we don't want to store IDs in application memory during the migration for the same reason.
196+
1. Populate that temporary table with IDs of records that need to update. This query only requires a read of the current records, so there are no consequential locks occurring when populating, but be aware this could be a lengthy query. Populating this table can occur at creation or afterwards; in this example we'll populate it at table creation.
197+
1. Ensure there's an index on the temporary table so it's fast to delete IDs from it. I use an index instead of a primary key because it's easier to re-run the migration in case there's an error. There isn't a straight-forward way to `CREATE IF NOT EXIST` on a primary key; but you can do that easily with an index.
198+
1. Use keyset pagination to pull batches of IDs from the temporary table. Do this inside a database transaction and lock records for updates. Each batch should read and update within milliseconds, so this should have little impact on concurrent reads and writes.
199+
1. For each batch of records, determine the data changes that need to happen. This can happen for each record.
200+
1. [Upsert](https://wiki.postgresql.org/wiki/UPSERT) those changes to the real table. This insert will include the ID of the record that already exists and a list of attributes to change for that record. Since these insertions will conflict with existing records, we'll instruct Postgres to replace certain fields on conflicts.
201+
1. Delete those IDs from the temporary table since they're updated on the real table. Close the database transaction for that batch.
202+
1. Throttle so we don't overwhelm the database, and also give opportunity to other concurrent processes to work.
203+
1. Rinse and repeat until the temporary table is empty.
204+
1. Finally, drop the temporary table when empty.
205+
206+
Let's see how this can work:
207+
208+
```bash
209+
mix ecto.gen.migration --migrations-path=priv/repo/data_migrations backfill_weather
210+
```
211+
212+
Modify the migration:
213+
214+
```elixir
215+
# Both of these modules are in the same migration file
216+
# In this example, we'll define a new Ecto Schema that is a snapshot
217+
# of the current underlying table and no more.
218+
defmodule MyApp.Repo.DataMigrations.BackfillWeather.MigratingSchema do
219+
use Ecto.Schema
220+
221+
# Copy of the schema at the time of migration
222+
schema "weather" do
223+
field :temp_lo, :integer
224+
field :temp_hi, :integer
225+
field :prcp, :float
226+
field :city, :string
227+
228+
timestamps(type: :naive_datetime_usec)
229+
end
230+
end
231+
232+
defmodule MyApp.Repo.DataMigrations.BackfillWeather do
233+
use Ecto.Migration
234+
import Ecto.Query
235+
alias MyApp.Repo.DataMigrations.BackfillWeather.MigratingSchema
236+
237+
@disable_ddl_transaction true
238+
@disable_migration_lock true
239+
@temp_table_name "records_to_update"
240+
@batch_size 1000
241+
@throttle_ms 100
242+
243+
def up do
244+
repo().query!("""
245+
CREATE TABLE IF NOT EXISTS "#{@temp_table_name}" AS
246+
SELECT id FROM weather WHERE inserted_at < '2021-08-21T00:00:00'
247+
""", [], log: :info, timeout: :infinity)
248+
flush()
249+
250+
create_if_not_exists index(@temp_table_name, [:id])
251+
flush()
252+
253+
throttle_change_in_batches(&page_query/1, &do_change/1)
254+
255+
# You may want to check if it's empty before dropping it.
256+
# Since we're raising an exception on non-updates
257+
# we don't have to do that in this example.
258+
drop table(@temp_table_name)
259+
end
260+
261+
def down, do: :ok
262+
263+
def do_change(batch_of_ids) do
264+
# Wrap in a transaction to momentarily lock records during read/update
265+
repo().transaction(fn ->
266+
mutations =
267+
from(
268+
r in MigratingSchema,
269+
where: r.id in ^batch_of_ids,
270+
lock: "FOR UPDATE"
271+
)
272+
|> repo().all()
273+
|> Enum.reduce([], &mutation/2)
274+
275+
# Don't be fooled by the name `insert_all`, this is actually an upsert
276+
# that will update existing records when conflicting; they should all
277+
# conflict since the ID is included in the update.
278+
279+
{_updated, results} = repo().insert_all(
280+
MigratingSchema,
281+
mutations,
282+
returning: [:id],
283+
# Alternatively, {:replace_all_except, [:id, :inserted_at]}
284+
on_conflict: {:replace, [:temp_lo, :updated_at]},
285+
conflict_target: [:id],
286+
placeholders: %{now: NaiveDateTime.utc_now()},
287+
log: :info
288+
)
289+
results = results |> Enum.map(& &1.id) |> Enum.sort()
290+
291+
not_updated =
292+
mutations
293+
|> Enum.map(& &1[:id])
294+
|> MapSet.new()
295+
|> MapSet.difference(MapSet.new(results))
296+
|> MapSet.to_list()
297+
298+
Enum.each(not_updated, &handle_non_update/1)
299+
repo().delete_all(from(r in @temp_table_name, where: r.id in ^results))
300+
301+
results
302+
end)
303+
end
304+
305+
def mutation(record, mutations_acc) do
306+
# This logic can be whatever you need; we'll just do something simple
307+
# here to illustrate
308+
309+
if record.temp_hi > 1 do
310+
# No updated needed
311+
mutations_acc
312+
else
313+
# Upserts don't update autogenerated fields like timestamps, so be sure
314+
# to update them yourself. The inserted_at value should never be used
315+
# since all these records are already inserted, and we won't replace
316+
# this field on conflicts; we just need it to satisfy table constraints.
317+
[%{
318+
id: record.id,
319+
temp_lo: record.temp_hi - 10,
320+
inserted_at: {:placeholder, :now},
321+
updated_at: {:placeholder, :now}
322+
} | mutations_acc]
323+
end
324+
end
325+
326+
def page_query(last_id) do
327+
from(
328+
r in @temp_table_name,
329+
select: r.id,
330+
where: r.id > ^last_id,
331+
order_by: [asc: r.id],
332+
limit: @batch_size
333+
)
334+
end
335+
336+
defp handle_non_update(id) do
337+
raise "#{inspect(id)} was not updated"
338+
end
339+
340+
# If you have integer IDs, fallback last_pod = 0
341+
# If you have binary IDs, fallback last_pos = "00000000-0000-0000-0000-000000000000"
342+
defp throttle_change_in_batches(query_fun, change_fun, last_pos \\ 0)
343+
defp throttle_change_in_batches(_query_fun, _change_fun, nil), do: :ok
344+
defp throttle_change_in_batches(query_fun, change_fun, last_pos) do
345+
case repo().all(query_fun.(last_pos), [log: :info, timeout: :infinity]) do
346+
[] ->
347+
:ok
348+
349+
ids ->
350+
case change_fun.(List.flatten(ids)) do
351+
{:ok, results} ->
352+
next_page = List.first(results)
353+
Process.sleep(@throttle_ms)
354+
throttle_change_in_batches(query_fun, change_fun, next_page)
355+
error ->
356+
raise error
357+
end
358+
end
359+
end
360+
end
361+
```
362+
363+
---
364+
365+
This guide was originally published on [Fly.io Phoenix Files](https://fly.io/phoenix-files/backfilling-data/).

0 commit comments

Comments
 (0)