-
Notifications
You must be signed in to change notification settings - Fork 467
Expand file tree
/
Copy pathpull.go
More file actions
239 lines (229 loc) · 8.6 KB
/
pull.go
File metadata and controls
239 lines (229 loc) · 8.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
package pull
import (
"bytes"
"context"
_ "embed"
"fmt"
"math"
"os"
"path/filepath"
"strconv"
"strings"
"github.com/go-errors/errors"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/spf13/afero"
"github.com/spf13/viper"
"github.com/supabase/cli/internal/db/declarative"
"github.com/supabase/cli/internal/db/diff"
"github.com/supabase/cli/internal/db/dump"
"github.com/supabase/cli/internal/db/start"
"github.com/supabase/cli/internal/migration/format"
"github.com/supabase/cli/internal/migration/list"
"github.com/supabase/cli/internal/migration/new"
"github.com/supabase/cli/internal/migration/repair"
"github.com/supabase/cli/internal/utils"
"github.com/supabase/cli/pkg/migration"
)
var (
errMissing = errors.New("No migrations found")
errInSync = errors.New("No schema changes found")
errConflict = errors.Errorf("The remote database's migration history does not match local files in %s directory.", utils.MigrationsDir)
)
func Run(ctx context.Context, schema []string, config pgconn.Config, name string, usePgDelta bool, usePgDeltaDiff bool, differ diff.DiffFunc, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
// 1. Check postgres connection
conn, err := utils.ConnectByConfig(ctx, config, options...)
if err != nil {
return err
}
defer conn.Close(context.Background())
// In experimental mode, allow db pull to switch from migration-file output to
// declarative-file output through pg-delta when explicitly requested.
if usePgDelta {
return pullDeclarativePgDelta(ctx, schema, config, fsys, options...)
}
if viper.GetBool("EXPERIMENTAL") {
var buf bytes.Buffer
if err := migration.DumpRole(ctx, config, &buf, dump.DockerExec); err != nil {
return err
}
if err := migration.DumpSchema(ctx, config, &buf, dump.DockerExec); err != nil {
return err
}
// TODO: handle managed schemas
return format.WriteStructuredSchemas(ctx, &buf, fsys)
}
// 2. Pull schema
timestamp := utils.GetCurrentTimestamp()
path := new.GetMigrationPath(timestamp, name)
if err := run(ctx, schema, path, conn, usePgDeltaDiff, differ, fsys); err != nil {
return err
}
// 3. Insert a row to `schema_migrations`
fmt.Fprintln(os.Stderr, "Schema written to "+utils.Bold(path))
if shouldUpdate, err := utils.NewConsole().PromptYesNo(ctx, "Update remote migration history table?", true); err != nil {
return err
} else if shouldUpdate {
return repair.UpdateMigrationTable(ctx, conn, []string{timestamp}, repair.Applied, false, fsys)
}
return nil
}
// pullDeclarativePgDelta exports remote schema into declarative SQL files by
// diffing against an empty shadow baseline with pg-delta declarative export.
//
// This path is separate from run() because it does not produce or update
// timestamped migration files.
func pullDeclarativePgDelta(ctx context.Context, schema []string, config pgconn.Config, fsys afero.Fs, options ...func(*pgx.ConnConfig)) error {
fmt.Fprintln(os.Stderr, "Preparing declarative schema export using pg-delta...")
shadow, err := diff.CreateShadowDatabase(ctx, utils.Config.Db.ShadowPort)
if err != nil {
return err
}
defer utils.DockerRemove(shadow)
if err := start.WaitForHealthyService(ctx, utils.Config.Db.HealthTimeout, shadow); err != nil {
return err
}
shadowConfig := pgconn.Config{
Host: utils.Config.Hostname,
Port: utils.Config.Db.ShadowPort,
User: "postgres",
Password: utils.Config.Db.Password,
Database: "postgres",
}
formatOptions := ""
if utils.Config.Experimental.PgDelta != nil {
formatOptions = strings.TrimSpace(utils.Config.Experimental.PgDelta.FormatOptions)
}
exported, err := diff.DeclarativeExportPgDelta(ctx, shadowConfig, config, schema, formatOptions, options...)
if err != nil {
return err
}
if err := declarative.WriteDeclarativeSchemas(exported, fsys); err != nil {
return err
}
fmt.Fprintln(os.Stderr, "Declarative schema written to "+utils.Bold(utils.GetDeclarativeDir()))
return nil
}
func run(ctx context.Context, schema []string, path string, conn *pgx.Conn, usePgDeltaDiff bool, differ diff.DiffFunc, fsys afero.Fs) error {
config := conn.Config().Config
// 1. Assert `supabase/migrations` and `schema_migrations` are in sync.
if err := assertRemoteInSync(ctx, conn, fsys); errors.Is(err, errMissing) {
// pg_dump strips ownership when restored as a non-superuser, so platform
// objects (FDWs, wasm wrappers, system-owned ACLs) leak into the migration
// and later break `supabase db reset`. pg-delta speaks pg_catalog directly
// and the supabase integration filter drops these by owner, so the diff
// against an empty shadow yields a clean initial migration on its own.
if !usePgDeltaDiff {
// Ignore schemas flag when working on the initial pull
if err = dumpRemoteSchema(ctx, path, config, fsys); err != nil {
return err
}
}
// For the legacy path this is a second pass that captures changes
// pg_dump cannot emit (default privileges, managed schemas). For the
// pg-delta path this is the only pass and produces the full schema.
if err = diffRemoteSchema(ctx, nil, path, config, usePgDeltaDiff, differ, fsys); errors.Is(err, errInSync) {
err = nil
}
return err
} else if err != nil {
return err
}
// 2. Fetch remote schema changes
return diffRemoteSchema(ctx, schema, path, config, usePgDeltaDiff, differ, fsys)
}
func dumpRemoteSchema(ctx context.Context, path string, config pgconn.Config, fsys afero.Fs) error {
// Special case if this is the first migration
fmt.Fprintln(os.Stderr, "Dumping schema from remote database...")
if err := utils.MkdirIfNotExistFS(fsys, filepath.Dir(path)); err != nil {
return err
}
f, err := fsys.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return errors.Errorf("failed to open dump file: %w", err)
}
defer f.Close()
return migration.DumpSchema(ctx, config, f, dump.DockerExec)
}
func diffRemoteSchema(ctx context.Context, schema []string, path string, config pgconn.Config, usePgDeltaDiff bool, differ diff.DiffFunc, fsys afero.Fs) error {
// Diff remote db (source) & shadow db (target) and write it as a new migration.
output, err := diff.DiffDatabase(ctx, schema, config, os.Stderr, fsys, differ, usePgDeltaDiff)
if err != nil {
return err
}
if trimmed := strings.TrimSpace(output); len(trimmed) == 0 {
return errors.New(errInSync)
}
if err := utils.MkdirIfNotExistFS(fsys, filepath.Dir(path)); err != nil {
return err
}
// Append to existing migration file when we run this after dumpRemoteSchema;
// for the pg-delta path this is the only writer and creates the file fresh.
f, err := fsys.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return errors.Errorf("failed to open migration file: %w", err)
}
defer f.Close()
if _, err := f.WriteString(output); err != nil {
return errors.Errorf("failed to write migration file: %w", err)
}
return nil
}
func assertRemoteInSync(ctx context.Context, conn *pgx.Conn, fsys afero.Fs) error {
remoteMigrations, err := migration.ListRemoteMigrations(ctx, conn)
if err != nil {
return err
}
localMigrations, err := list.LoadLocalVersions(fsys)
if err != nil {
return err
}
// Find any mismatch between local and remote migrations
var extraRemote, extraLocal []string
for i, j := 0, 0; i < len(remoteMigrations) || j < len(localMigrations); {
remoteTimestamp := math.MaxInt
if i < len(remoteMigrations) {
if remoteTimestamp, err = strconv.Atoi(remoteMigrations[i]); err != nil {
i++
continue
}
}
localTimestamp := math.MaxInt
if j < len(localMigrations) {
if localTimestamp, err = strconv.Atoi(localMigrations[j]); err != nil {
j++
continue
}
}
// Top to bottom chronological order
if localTimestamp < remoteTimestamp {
extraLocal = append(extraLocal, localMigrations[j])
j++
} else if remoteTimestamp < localTimestamp {
extraRemote = append(extraRemote, remoteMigrations[i])
i++
} else {
i++
j++
}
}
// Suggest delete local migrations / reset migration history
if len(extraRemote)+len(extraLocal) > 0 {
utils.CmdSuggestion = suggestMigrationRepair(extraRemote, extraLocal)
return errors.New(errConflict)
}
if len(localMigrations) == 0 {
return errors.New(errMissing)
}
return nil
}
func suggestMigrationRepair(extraRemote, extraLocal []string) string {
result := fmt.Sprintln("\nMake sure your local git repo is up-to-date. If the error persists, try repairing the migration history table:")
for _, version := range extraRemote {
result += fmt.Sprintln(utils.Bold("supabase migration repair --status reverted " + version))
}
for _, version := range extraLocal {
result += fmt.Sprintln(utils.Bold("supabase migration repair --status applied " + version))
}
return result
}