@@ -106,6 +106,52 @@ func (cmd *commander) Version(ctx context.Context, id v1.ClickHouseReplicaID) (s
106106 return version , nil
107107}
108108
109+ func (cmd * commander ) SyncDatabases (ctx context.Context , log controllerutil.Logger , replicas []v1.ClickHouseReplicaID ) bool {
110+ result := true
111+
112+ replicaDatabases := controllerutil .ExecuteParallel (replicas , func (id v1.ClickHouseReplicaID ) (v1.ClickHouseReplicaID , map [string ]databaseDescriptor , error ) {
113+ databases , err := cmd .Databases (ctx , id )
114+ return id , databases , err
115+ })
116+
117+ databases := map [string ]databaseDescriptor {}
118+ for id , dbs := range replicaDatabases {
119+ if dbs .Err != nil {
120+ log .Warn ("failed to get databases from replica" , "replica_id" , id , "error" , dbs .Err )
121+
122+ result = false
123+ continue
124+ }
125+
126+ databases = controllerutil .MergeMaps (databases , dbs .Result )
127+ }
128+
129+ results := controllerutil .ExecuteParallel (replicas , func (id v1.ClickHouseReplicaID ) (v1.ClickHouseReplicaID , struct {}, error ) {
130+ if len (databases ) == len (replicaDatabases [id ].Result ) {
131+ return id , struct {}{}, nil
132+ }
133+
134+ dbsToSync := map [string ]databaseDescriptor {}
135+ for name , desc := range databases {
136+ if _ , ok := replicaDatabases [id ].Result [name ]; ! ok {
137+ dbsToSync [name ] = desc
138+ }
139+ }
140+
141+ return id , struct {}{}, cmd .CreateDatabases (ctx , log , id , dbsToSync )
142+ })
143+
144+ for id , res := range results {
145+ if res .Err != nil {
146+ log .Info ("failed to create databases" , "replica_id" , id , "error" , res .Err )
147+
148+ result = false
149+ }
150+ }
151+
152+ return result
153+ }
154+
109155func (cmd * commander ) Databases (ctx context.Context , id v1.ClickHouseReplicaID ) (map [string ]databaseDescriptor , error ) {
110156 conn , err := cmd .getConn (id )
111157 if err != nil {
@@ -169,55 +215,23 @@ func (cmd *commander) CreateDatabases(ctx context.Context, log controllerutil.Lo
169215 return nil
170216}
171217
172- // EnsureDefaultDatabaseEngine ensures that the default database engine is set to the Selected one.
173- func (cmd * commander ) EnsureDefaultDatabaseEngine (ctx context.Context , log controllerutil.Logger , id v1.ClickHouseReplicaID ) error {
174- log = log .With ("replica_id" , id )
175-
176- conn , err := cmd .getConn (id )
177- if err != nil {
178- return fmt .Errorf ("failed to get connection for replica %s: %w" , id , err )
179- }
180-
181- var engine string
182-
183- rows := conn .QueryRow (ctx , "SELECT engine FROM system.databases WHERE name='default' " )
184- if err = rows .Scan (& engine ); err != nil {
185- if ! errors .Is (err , sql .ErrNoRows ) {
186- return fmt .Errorf ("failed to scan default database engine for replica %s: %w" , id , err )
187- }
188-
189- log .Debug ("no default database found" )
190- } else {
191- if engine == "Replicated" {
192- log .Debug ("default database already has the Replicated engine" )
193- return nil
194- }
195-
196- var count uint64
197- if err = conn .QueryRow (ctx , "SELECT COUNT() FROM system.tables WHERE database='default'" ).Scan (& count ); err != nil {
198- log .Error (err , "error checking if database 'default' has tables" )
199- return fmt .Errorf ("check tables in %s: %w" , id , err )
200- }
201-
202- if count > 0 {
203- log .Warn ("database `default` has tables, but its engine is not Replicated, data loss is possible" )
204- }
218+ // EnsureDefaultDatabaseEngine ensures that the default database engine is set to the Replicated.
219+ func (cmd * commander ) EnsureDefaultDatabaseEngine (ctx context.Context , log controllerutil.Logger , replicas []v1.ClickHouseReplicaID ) bool {
220+ res := controllerutil .ExecuteParallel (replicas , func (id v1.ClickHouseReplicaID ) (v1.ClickHouseReplicaID , struct {}, error ) {
221+ err := cmd .ensureReplicaDefaultDatabaseEngine (ctx , log , id )
222+ return id , struct {}{}, err
223+ })
205224
206- log .Debug ("dropping default database" )
225+ result := true
226+ for id , repl := range res {
227+ if repl .Err != nil {
228+ log .Info ("failed to recreate default database as Replicated" , "replica_id" , id , "error" , repl .Err )
207229
208- if err := conn .Exec (ctx , "DROP DATABASE default SYNC" ); err != nil {
209- return fmt .Errorf ("failed to drop default database on replica %s: %w" , id , err )
230+ result = false
210231 }
211232 }
212233
213- log .Debug ("creating replicated default database" )
214-
215- defaultDatabaseUUID := uuid .NewSHA1 (uuid .Nil , []byte (cmd .cluster .SpecificName ())).String ()
216- if err = conn .Exec (ctx , createDefaultDatabaseQuery , defaultDatabaseUUID ); err != nil {
217- return fmt .Errorf ("create default replicated database %s: %w" , id , err )
218- }
219-
220- return nil
234+ return result
221235}
222236
223237func (cmd * commander ) SyncShard (ctx context.Context , log controllerutil.Logger , shardID int32 ) error {
@@ -322,7 +336,7 @@ func (cmd *commander) SyncReplica(ctx context.Context, log controllerutil.Logger
322336func (cmd * commander ) CleanupDatabaseReplicas (
323337 ctx context.Context ,
324338 log controllerutil.Logger ,
325- notInSync map [v1.ClickHouseReplicaID ]struct {},
339+ running map [v1.ClickHouseReplicaID ]struct {},
326340) error {
327341 id , conn , err := cmd .getAnyConn (ctx )
328342 if err != nil {
@@ -357,8 +371,8 @@ func (cmd *commander) CleanupDatabaseReplicas(
357371 continue
358372 }
359373
360- if _ , ok := notInSync [toDrop ]; ok {
361- log .Debug ("skipping stale database replica cleanup that is not in sync " , "database" , database , "replica_id" , toDrop )
374+ if _ , ok := running [toDrop ]; ok {
375+ log .Debug ("skipping stale database replica cleanup that is still running " , "database" , database , "replica_id" , toDrop )
362376 continue
363377 }
364378
@@ -442,3 +456,52 @@ func (cmd *commander) getAnyConn(ctx context.Context) (v1.ClickHouseReplicaID, c
442456
443457 return v1.ClickHouseReplicaID {}, nil , errors .New ("no available connections" )
444458}
459+
460+ func (cmd * commander ) ensureReplicaDefaultDatabaseEngine (ctx context.Context , log controllerutil.Logger , id v1.ClickHouseReplicaID ) error {
461+ log = log .With ("replica_id" , id )
462+
463+ conn , err := cmd .getConn (id )
464+ if err != nil {
465+ return fmt .Errorf ("failed to get connection for replica %s: %w" , id , err )
466+ }
467+
468+ var engine string
469+
470+ rows := conn .QueryRow (ctx , "SELECT engine FROM system.databases WHERE name='default' " )
471+ if err = rows .Scan (& engine ); err != nil {
472+ if ! errors .Is (err , sql .ErrNoRows ) {
473+ return fmt .Errorf ("failed to scan default database engine for replica %s: %w" , id , err )
474+ }
475+
476+ log .Debug ("no default database found" )
477+ } else {
478+ if engine == "Replicated" {
479+ return nil
480+ }
481+
482+ var count uint64
483+ if err = conn .QueryRow (ctx , "SELECT COUNT() FROM system.tables WHERE database='default'" ).Scan (& count ); err != nil {
484+ log .Error (err , "error checking if database 'default' has tables" )
485+ return fmt .Errorf ("check tables in %s: %w" , id , err )
486+ }
487+
488+ if count > 0 {
489+ log .Warn ("database `default` has tables, but its engine is not Replicated, data loss is possible" )
490+ }
491+
492+ log .Debug ("dropping default database" )
493+
494+ if err := conn .Exec (ctx , "DROP DATABASE default SYNC" ); err != nil {
495+ return fmt .Errorf ("failed to drop default database on replica %s: %w" , id , err )
496+ }
497+ }
498+
499+ log .Debug ("creating replicated default database" )
500+
501+ defaultDatabaseUUID := uuid .NewSHA1 (uuid .Nil , []byte (cmd .cluster .SpecificName ())).String ()
502+ if err = conn .Exec (ctx , createDefaultDatabaseQuery , defaultDatabaseUUID ); err != nil {
503+ return fmt .Errorf ("create default replicated database %s: %w" , id , err )
504+ }
505+
506+ return nil
507+ }
0 commit comments