Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions internal/db/diff/templates/pgdelta.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import {
createPlan,
deserializeCatalog,
formatSqlStatements,
} from "npm:@supabase/pg-delta@1.0.0-alpha.11";
import { supabase } from "npm:@supabase/pg-delta@1.0.0-alpha.11/integrations/supabase";
} from "npm:@supabase/pg-delta@1.0.0-alpha.13";
import { supabase } from "npm:@supabase/pg-delta@1.0.0-alpha.13/integrations/supabase";

async function resolveInput(ref: string | undefined) {
if (!ref) {
Expand Down
2 changes: 1 addition & 1 deletion internal/db/diff/templates/pgdelta_catalog_export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
extractCatalog,
serializeCatalog,
stringifyCatalogSnapshot,
} from "npm:@supabase/pg-delta@1.0.0-alpha.11";
} from "npm:@supabase/pg-delta@1.0.0-alpha.13";

const target = Deno.env.get("TARGET");
const role = Deno.env.get("ROLE") ?? undefined;
Expand Down
14 changes: 3 additions & 11 deletions internal/db/diff/templates/pgdelta_declarative_export.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import {
createPlan,
deserializeCatalog,
exportDeclarativeSchema,
} from "npm:@supabase/pg-delta@1.0.0-alpha.11";
import { supabase } from "npm:@supabase/pg-delta@1.0.0-alpha.11/integrations/supabase";
} from "npm:@supabase/pg-delta@1.0.0-alpha.13";
import { supabase } from "npm:@supabase/pg-delta@1.0.0-alpha.13/integrations/supabase";

async function resolveInput(ref: string | undefined) {
if (!ref) {
Expand All @@ -21,14 +21,6 @@ async function resolveInput(ref: string | undefined) {

const source = Deno.env.get("SOURCE");
const target = Deno.env.get("TARGET");
supabase.filter = {
// Also allow dropped extensions from migrations to be captured in the declarative schema export
// TODO: fix upstream bug into pgdelta supabase integration
or: [
...supabase.filter!.or!,
{ objectType: "extension", operation: "drop", scope: "object" },
],
};

const includedSchemas = Deno.env.get("INCLUDED_SCHEMAS");
if (includedSchemas) {
Expand All @@ -46,7 +38,6 @@ let formatOptions = undefined;
if (formatOptionsRaw) {
formatOptions = JSON.parse(formatOptionsRaw);
}

try {
const result = await createPlan(
await resolveInput(source),
Expand All @@ -66,6 +57,7 @@ try {
);
} else {
const output = exportDeclarativeSchema(result, {
integration: supabase,
formatOptions,
});
console.log(
Expand Down
2 changes: 1 addition & 1 deletion internal/db/pgcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import {
extractCatalog,
serializeCatalog,
stringifyCatalogSnapshot,
} from "npm:@supabase/pg-delta@1.0.0-alpha.11";
} from "npm:@supabase/pg-delta@1.0.0-alpha.13";
const target = Deno.env.get("TARGET");
const role = Deno.env.get("ROLE") ?? undefined;
if (!target) {
Expand Down
149 changes: 136 additions & 13 deletions internal/pgdelta/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"fmt"
"os"
"path/filepath"
"strings"

"github.com/go-errors/errors"
"github.com/jackc/pgconn"
"github.com/spf13/afero"
"github.com/spf13/viper"
"github.com/supabase/cli/internal/utils"
)

Expand All @@ -22,13 +24,129 @@ var pgDeltaDeclarativeApplyScript string
//
// The fields are surfaced to provide concise CLI feedback after apply runs.
type ApplyResult struct {
Status string `json:"status"`
TotalStatements int `json:"totalStatements"`
TotalRounds int `json:"totalRounds"`
TotalApplied int `json:"totalApplied"`
TotalSkipped int `json:"totalSkipped"`
Errors []string `json:"errors"`
StuckStatements []string `json:"stuckStatements"`
Status string `json:"status"`
TotalStatements int `json:"totalStatements"`
TotalRounds int `json:"totalRounds"`
TotalApplied int `json:"totalApplied"`
TotalSkipped int `json:"totalSkipped"`
Errors []ApplyIssue `json:"errors"`
StuckStatements []ApplyIssue `json:"stuckStatements"`
}

// ApplyIssue models a pg-delta apply error or stuck statement.
//
// pg-delta may emit either a plain string or a structured object, so unmarshal
// needs to gracefully handle both forms.
type ApplyIssue struct {
Statement *ApplyStatement `json:"statement,omitempty"`
Code string `json:"code,omitempty"`
Message string `json:"message,omitempty"`
IsDependencyError bool `json:"isDependencyError,omitempty"`
}

type ApplyStatement struct {
ID string `json:"id"`
SQL string `json:"sql"`
StatementClass string `json:"statementClass"`
}

func (i *ApplyIssue) UnmarshalJSON(data []byte) error {
trimmed := bytes.TrimSpace(data)
if bytes.Equal(trimmed, []byte("null")) {
*i = ApplyIssue{}
return nil
}
var message string
if err := json.Unmarshal(trimmed, &message); err == nil {
*i = ApplyIssue{Message: message}
return nil
}
type alias ApplyIssue
var parsed alias
if err := json.Unmarshal(trimmed, &parsed); err != nil {
return err
}
*i = ApplyIssue(parsed)
return nil
}

func formatApplyFailure(result ApplyResult) string {
totalStatements := result.TotalStatements
if totalStatements == 0 {
totalStatements = result.TotalApplied + result.TotalSkipped + len(result.StuckStatements)
}
lines := []string{
fmt.Sprintf("pg-delta apply returned status %q.", result.Status),
fmt.Sprintf("%d/%d statements applied in %d round(s); %d skipped.", result.TotalApplied, totalStatements, result.TotalRounds, result.TotalSkipped),
}
if len(result.Errors) > 0 {
lines = append(lines, "Errors:")
for _, issue := range result.Errors {
lines = append(lines, formatApplyIssue(issue))
}
}
if len(result.StuckStatements) > 0 {
lines = append(lines, "Stuck statements:")
for _, issue := range result.StuckStatements {
lines = append(lines, formatApplyIssue(issue))
}
}
return strings.Join(lines, "\n")
}

func formatApplyIssue(issue ApplyIssue) string {
if issue.Statement == nil {
return "- " + formatApplyIssueMessage(issue)
}
title := "- " + issue.Statement.ID
if issue.Statement.StatementClass != "" {
title += " [" + issue.Statement.StatementClass + "]"
}
lines := []string{title}
lines = append(lines, " "+formatApplyIssueMessage(issue))
if sql := formatStatementSQL(issue.Statement.SQL); sql != "" {
lines = append(lines, " SQL: "+sql)
}
return strings.Join(lines, "\n")
}

func formatApplyIssueMessage(issue ApplyIssue) string {
message := strings.TrimSpace(issue.Message)
if message == "" {
message = "unknown pg-delta issue"
}
var metadata []string
if issue.Code != "" {
metadata = append(metadata, "SQLSTATE "+issue.Code)
}
if issue.IsDependencyError {
metadata = append(metadata, "dependency error")
}
if len(metadata) == 0 {
return message
}
return fmt.Sprintf("%s (%s)", message, strings.Join(metadata, ", "))
}

func formatStatementSQL(sql string) string {
normalized := strings.Join(strings.Fields(sql), " ")
const maxLen = 120
if len(normalized) <= maxLen {
return normalized
}
return normalized[:maxLen-3] + "..."
}

func formatDebugJSON(raw []byte) string {
trimmed := bytes.TrimSpace(raw)
if len(trimmed) == 0 {
return ""
}
var indented bytes.Buffer
if err := json.Indent(&indented, trimmed, "", " "); err == nil {
return indented.String()
}
return string(trimmed)
}

// ApplyDeclarative applies files from supabase/declarative to the target
Expand Down Expand Up @@ -64,14 +182,19 @@ func ApplyDeclarative(ctx context.Context, config pgconn.Config, fsys afero.Fs)

var result ApplyResult
if err := json.Unmarshal(stdout.Bytes(), &result); err != nil {
return errors.Errorf("failed to parse pg-delta apply output: %w\nstdout: %s", err, stdout.String())
if viper.GetBool("DEBUG") {
return errors.Errorf("failed to parse pg-delta apply output: %w\nstdout: %s", err, stdout.String())
}
return errors.Errorf("failed to parse pg-delta apply output: %w", err)
}
if result.Status != "success" {
if len(result.Errors) > 0 {
fmt.Fprintf(os.Stderr, "Errors: %v\n", result.Errors)
}
if len(result.StuckStatements) > 0 {
fmt.Fprintf(os.Stderr, "Stuck statements: %v\n", result.StuckStatements)
if viper.GetBool("DEBUG") {
if debugJSON := formatDebugJSON(stdout.Bytes()); len(debugJSON) > 0 {
fmt.Fprintln(os.Stderr, "pg-delta apply result:")
fmt.Fprintln(os.Stderr, debugJSON)
}
} else {
fmt.Fprintln(os.Stderr, formatApplyFailure(result))
}
return errors.Errorf("pg-delta declarative apply failed with status: %s", result.Status)
}
Expand Down
95 changes: 95 additions & 0 deletions internal/pgdelta/apply_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package pgdelta

import (
"encoding/json"
"strings"
"testing"
)

func TestApplyResultUnmarshalStructuredStuckStatements(t *testing.T) {
raw := []byte(`{
"status": "stuck",
"totalStatements": 34,
"totalRounds": 2,
"totalApplied": 29,
"totalSkipped": 0,
"errors": [],
"stuckStatements": [
{
"statement": {
"id": "cluster/extensions/pgmq.sql:0",
"sql": "CREATE EXTENSION pgmq WITH SCHEMA pgmq;",
"statementClass": "CREATE_EXTENSION"
},
"code": "3F000",
"message": "schema \"pgmq\" does not exist",
"isDependencyError": true
}
]
}`)

var result ApplyResult
if err := json.Unmarshal(raw, &result); err != nil {
t.Fatalf("json.Unmarshal() error = %v", err)
}

if got, want := len(result.StuckStatements), 1; got != want {
t.Fatalf("len(StuckStatements) = %d, want %d", got, want)
}

stuck := result.StuckStatements[0]
if stuck.Statement == nil {
t.Fatal("expected structured statement details")
}
if got, want := stuck.Statement.ID, "cluster/extensions/pgmq.sql:0"; got != want {
t.Fatalf("Statement.ID = %q, want %q", got, want)
}
if got, want := stuck.Statement.StatementClass, "CREATE_EXTENSION"; got != want {
t.Fatalf("Statement.StatementClass = %q, want %q", got, want)
}
if got, want := stuck.Code, "3F000"; got != want {
t.Fatalf("Code = %q, want %q", got, want)
}
if got, want := stuck.Message, `schema "pgmq" does not exist`; got != want {
t.Fatalf("Message = %q, want %q", got, want)
}
if !stuck.IsDependencyError {
t.Fatal("expected dependency error to be preserved")
}
}

func TestFormatApplyFailure(t *testing.T) {
result := ApplyResult{
Status: "stuck",
TotalStatements: 34,
TotalRounds: 2,
TotalApplied: 29,
TotalSkipped: 0,
StuckStatements: []ApplyIssue{
{
Statement: &ApplyStatement{
ID: "cluster/extensions/pgmq.sql:0",
SQL: "CREATE EXTENSION pgmq WITH SCHEMA pgmq;",
StatementClass: "CREATE_EXTENSION",
},
Code: "3F000",
Message: `schema "pgmq" does not exist`,
IsDependencyError: true,
},
},
}

formatted := formatApplyFailure(result)
assertContains(t, formatted, `pg-delta apply returned status "stuck"`)
assertContains(t, formatted, `29/34 statements applied in 2 round(s)`)
assertContains(t, formatted, `cluster/extensions/pgmq.sql:0 [CREATE_EXTENSION]`)
assertContains(t, formatted, `schema "pgmq" does not exist (SQLSTATE 3F000, dependency error)`)
assertContains(t, formatted, `SQL: CREATE EXTENSION pgmq WITH SCHEMA pgmq;`)
}

func assertContains(t *testing.T, text, want string) {
t.Helper()
if !strings.Contains(text, want) {
t.Fatalf("expected %q to contain %q", text, want)
}
}
2 changes: 1 addition & 1 deletion internal/pgdelta/templates/pgdelta_declarative_apply.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import {
applyDeclarativeSchema,
loadDeclarativeSchema,
} from "npm:@supabase/pg-delta@1.0.0-alpha.11/declarative";
} from "npm:@supabase/pg-delta@1.0.0-alpha.13/declarative";

const schemaPath = Deno.env.get("SCHEMA_PATH");
const target = Deno.env.get("TARGET");
Expand Down
Loading