Skip to content

Commit cac48ef

Browse files
committed
persist cmvs in db
1 parent 3482162 commit cac48ef

9 files changed

Lines changed: 136 additions & 2 deletions

File tree

CMV_SUPPORT.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ row, err := table.ReadRow(ctx, "region-a#account-42#...")
8282
creation are not reflected in the CMV table.
8383
- **Backfill**: Data written to the source table before the CMV is registered is not
8484
retroactively copied.
85-
- **Persistence**: CMV registrations are in-memory only and do not survive a restart.
86-
Re-register via `CreateMaterializedView` on each startup.
85+
- **Persistence**: CMV registrations are persisted to SQLite alongside table data and are
86+
automatically restored on startup. Shadow table row data is also persistent (it is stored
87+
as a regular table in `tables_t`).
8788

8889
## Example: Key Transformation
8990

bttest/cmv_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ func newTestServerWithCMV(t *testing.T, configs []CMVConfig) *server {
2727
materializedViews: make(map[string]*btapb.MaterializedView),
2828
db: db,
2929
tableBackend: NewSqlTables(db),
30+
mvBackend: NewSqlMaterializedViews(db),
3031
cmvs: newCMVRegistry(),
3132
}
3233
for _, cfg := range configs {
@@ -287,6 +288,7 @@ func TestCreateMaterializedViewRPC(t *testing.T) {
287288
materializedViews: make(map[string]*btapb.MaterializedView),
288289
db: db,
289290
tableBackend: NewSqlTables(db),
291+
mvBackend: NewSqlMaterializedViews(db),
290292
cmvs: newCMVRegistry(),
291293
}
292294

@@ -431,6 +433,7 @@ func newTestInstanceServer(t *testing.T) (*server, string) {
431433
materializedViews: make(map[string]*btapb.MaterializedView),
432434
db: db,
433435
tableBackend: NewSqlTables(db),
436+
mvBackend: NewSqlMaterializedViews(db),
434437
cmvs: newCMVRegistry(),
435438
}
436439
return s, "projects/test/instances/test"

bttest/inmem.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type server struct {
9494
gcc chan int // set when gcloop starts, closed when server shuts down
9595
db *sql.DB
9696
tableBackend *SqlTables
97+
mvBackend *SqlMaterializedViews
9798
cmvs *cmvRegistry
9899

99100
materializedViews map[string]*btapb.MaterializedView // keyed by full resource name
@@ -123,6 +124,7 @@ func NewServer(laddr string, db *sql.DB, opt ...grpc.ServerOption) (*Server, err
123124
materializedViews: make(map[string]*btapb.MaterializedView),
124125
db: db,
125126
tableBackend: NewSqlTables(db),
127+
mvBackend: NewSqlMaterializedViews(db),
126128
cmvs: newCMVRegistry(),
127129
},
128130
}
@@ -131,6 +133,7 @@ func NewServer(laddr string, db *sql.DB, opt ...grpc.ServerOption) (*Server, err
131133
}
132134
longrunningpb.RegisterOperationsServer(s.srv, opsServer)
133135
s.s.LoadTables()
136+
s.s.LoadMaterializedViews()
134137
btapb.RegisterBigtableInstanceAdminServer(s.srv, s.s)
135138
btapb.RegisterBigtableTableAdminServer(s.srv, s.s)
136139
btpb.RegisterBigtableServer(s.srv, s.s)
@@ -159,6 +162,36 @@ func (s *server) LoadTables() {
159162
}
160163
}
161164

165+
// LoadMaterializedViews restores persisted CMV metadata from SQLite on startup.
166+
// For each stored view, it re-parses the SQL to reconstruct the CMVConfig and
167+
// re-registers it so that write-time propagation resumes immediately.
168+
func (s *server) LoadMaterializedViews() {
169+
for _, v := range s.mvBackend.GetAll() {
170+
cfg, err := ParseCMVConfigFromSQL(viewIDFromName(v.name), v.query)
171+
if err != nil {
172+
// Log and skip rather than crashing — the parser may have evolved.
173+
log.Printf("WARNING: skipping persisted materialized view %q: could not parse query: %v", v.name, err)
174+
continue
175+
}
176+
s.cmvs.register(*cfg)
177+
s.materializedViews[v.name] = &btapb.MaterializedView{
178+
Name: v.name,
179+
Query: v.query,
180+
DeletionProtection: v.deletionProtection,
181+
}
182+
}
183+
}
184+
185+
// viewIDFromName extracts the view ID from a full resource name of the form
186+
// projects/.../instances/.../materializedViews/<id>.
187+
func viewIDFromName(name string) string {
188+
const sep = "/materializedViews/"
189+
if idx := strings.LastIndex(name, sep); idx >= 0 {
190+
return name[idx+len(sep):]
191+
}
192+
return name
193+
}
194+
162195
// ensureCMVTable creates the shadow table for a CMV if it doesn't already exist.
163196
// Copies column families from the source table, filtered by IncludeFamilies.
164197
// Must be called with s.mu held.
@@ -383,6 +416,7 @@ func (s *server) DeleteTable(ctx context.Context, req *btapb.DeleteTableRequest)
383416
}
384417
for mvName := range s.materializedViews {
385418
if strings.HasSuffix(mvName, "/materializedViews/"+viewID) {
419+
s.mvBackend.Delete(mvName)
386420
delete(s.materializedViews, mvName)
387421
break
388422
}

bttest/inmem_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func newTestServer(t *testing.T) *server {
7878
tables: make(map[string]*table),
7979
db: db,
8080
tableBackend: NewSqlTables(db),
81+
mvBackend: NewSqlMaterializedViews(db),
8182
cmvs: newCMVRegistry(),
8283
}
8384
return s

bttest/instance_server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ func (s *server) CreateMaterializedView(ctx context.Context, req *btapb.CreateMa
176176
DeletionProtection: mv.DeletionProtection,
177177
}
178178
s.materializedViews[name] = stored
179+
s.mvBackend.Save(name, mv.Query, mv.DeletionProtection)
179180

180181
respAny, err := anypb.New(stored)
181182
if err != nil {
@@ -236,6 +237,7 @@ func (s *server) UpdateMaterializedView(ctx context.Context, req *btapb.UpdateMa
236237
return nil, status.Errorf(codes.InvalidArgument, "unsupported update field: %q", path)
237238
}
238239
}
240+
s.mvBackend.Save(stored.Name, stored.Query, stored.DeletionProtection)
239241

240242
respAny, err := anypb.New(stored)
241243
if err != nil {
@@ -271,6 +273,7 @@ func (s *server) DeleteMaterializedView(ctx context.Context, req *btapb.DeleteMa
271273
delete(s.tables, fqShadow)
272274
}
273275
}
276+
s.mvBackend.Delete(req.Name)
274277
delete(s.materializedViews, req.Name)
275278
return new(empty.Empty), nil
276279
}

bttest/sql_materialized_views.go

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package bttest
2+
3+
import (
4+
"database/sql"
5+
"log"
6+
)
7+
8+
// SqlMaterializedViews persists materialized view metadata to materialized_views_t.
9+
type SqlMaterializedViews struct {
10+
db *sql.DB
11+
}
12+
13+
// NewSqlMaterializedViews returns a SqlMaterializedViews backed by the given DB.
14+
func NewSqlMaterializedViews(db *sql.DB) *SqlMaterializedViews {
15+
return &SqlMaterializedViews{db: db}
16+
}
17+
18+
type storedMaterializedView struct {
19+
name string
20+
query string
21+
deletionProtection bool
22+
}
23+
24+
// GetAll returns all persisted materialized views, used to restore state on startup.
25+
func (m *SqlMaterializedViews) GetAll() []storedMaterializedView {
26+
rows, err := m.db.Query("SELECT name, query, deletion_protection FROM materialized_views_t")
27+
if err == sql.ErrNoRows {
28+
return nil
29+
}
30+
if err != nil {
31+
log.Fatal(err)
32+
}
33+
defer rows.Close()
34+
35+
var result []storedMaterializedView
36+
for rows.Next() {
37+
var v storedMaterializedView
38+
var dp int
39+
if err := rows.Scan(&v.name, &v.query, &dp); err != nil {
40+
log.Fatal(err)
41+
}
42+
v.deletionProtection = dp != 0
43+
result = append(result, v)
44+
}
45+
if err := rows.Err(); err != nil {
46+
log.Fatal(err)
47+
}
48+
return result
49+
}
50+
51+
// Save upserts a materialized view record. Called on CreateMaterializedView and
52+
// UpdateMaterializedView to keep the persisted state in sync with in-memory state.
53+
func (m *SqlMaterializedViews) Save(name, query string, deletionProtection bool) {
54+
dp := 0
55+
if deletionProtection {
56+
dp = 1
57+
}
58+
_, err := m.db.Exec(
59+
"INSERT INTO materialized_views_t (name, query, deletion_protection) VALUES (?, ?, ?)"+
60+
" ON CONFLICT(name) DO UPDATE SET query=excluded.query, deletion_protection=excluded.deletion_protection",
61+
name, query, dp,
62+
)
63+
if err != nil {
64+
log.Fatalf("saving materialized view %q: %v", name, err)
65+
}
66+
}
67+
68+
// Delete removes a materialized view record by its full resource name.
69+
func (m *SqlMaterializedViews) Delete(name string) {
70+
_, err := m.db.Exec("DELETE FROM materialized_views_t WHERE name = ?", name)
71+
if err != nil {
72+
log.Fatal(err)
73+
}
74+
}

bttest/sql_rows.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type SqlRows struct {
2424
db *sql.DB
2525
}
2626

27+
// NewSqlRows returns a SqlRows for a specific table within the given parent instance path.
2728
func NewSqlRows(db *sql.DB, parent, tableId string) *SqlRows {
2829
return &SqlRows{
2930
parent: parent,

bttest/sql_schema.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"database/sql"
66
)
77

8+
// CreateTables initializes the SQLite schema for the emulator, creating all
9+
// required tables if they do not already exist. Safe to call on an existing DB.
810
func CreateTables(ctx context.Context, db *sql.DB) error {
911
query := "CREATE TABLE IF NOT EXISTS rows_t ( \n" +
1012
"`parent` TEXT NOT NULL,\n" +
@@ -32,5 +34,15 @@ func CreateTables(ctx context.Context, db *sql.DB) error {
3234
if err != nil {
3335
return err
3436
}
37+
38+
query = "CREATE TABLE IF NOT EXISTS materialized_views_t (\n" +
39+
"`name` TEXT PRIMARY KEY,\n" +
40+
"`query` TEXT NOT NULL,\n" +
41+
"`deletion_protection` INTEGER NOT NULL DEFAULT 0\n" +
42+
")"
43+
_, err = db.ExecContext(ctx, query)
44+
if err != nil {
45+
return err
46+
}
3547
return nil
3648
}

bttest/sql_tables.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ type SqlTables struct {
1515
db *sql.DB
1616
}
1717

18+
// NewSqlTables returns a SqlTables backed by the given DB.
1819
func NewSqlTables(db *sql.DB) *SqlTables {
1920
return &SqlTables{
2021
db: db,
@@ -51,6 +52,7 @@ func (t *table) Bytes() ([]byte, error) {
5152
return b.Bytes(), err
5253
}
5354

55+
// Get loads a single table's metadata from the DB. Returns nil if not found.
5456
func (db *SqlTables) Get(parent, tableId string) *table {
5557
tbl := &table{
5658
parent: parent,
@@ -64,6 +66,7 @@ func (db *SqlTables) Get(parent, tableId string) *table {
6466
return tbl
6567
}
6668

69+
// GetAll loads all table metadata from the DB, used to restore state on startup.
6770
func (db *SqlTables) GetAll() []*table {
6871
var tables []*table
6972

@@ -89,6 +92,7 @@ func (db *SqlTables) GetAll() []*table {
8992
return tables
9093
}
9194

95+
// Save upserts a table's metadata (column family definitions) to the DB.
9296
func (db *SqlTables) Save(t *table) {
9397
metadata, err := t.Bytes()
9498
if err != nil {
@@ -103,6 +107,7 @@ func (db *SqlTables) Save(t *table) {
103107
}
104108
}
105109

110+
// Delete removes a table's metadata record from the DB.
106111
func (db *SqlTables) Delete(t *table) {
107112
_, err := db.db.Exec("DELETE FROM tables_t WHERE parent = ? AND table_id = ? ", t.parent, t.tableId)
108113
if err != nil {

0 commit comments

Comments
 (0)