44 "context"
55 "fmt"
66 "os"
7- "sync"
87
98 "github.com/THD-Spatial/City2TABULA/internal/config"
109 "github.com/THD-Spatial/City2TABULA/internal/importer"
@@ -23,7 +22,6 @@ type SetupOperation struct {
2322
2423// CreateCompleteDatabase creates the complete City2TABULA database with CityDB infrastructure
2524func CreateCompleteDatabase (config * config.Config , conn * pgxpool.Pool ) error {
26- utils .Info .Println ("Creating complete City2TABULA database..." )
2725
2826 // Step 1: Create CityDB infrastructure
2927 if err := CreateCityDB (config ); err != nil {
@@ -40,7 +38,6 @@ func CreateCompleteDatabase(config *config.Config, conn *pgxpool.Pool) error {
4038 return fmt .Errorf ("failed to import data: %w" , err )
4139 }
4240
43- utils .Debug .Println ("Complete database created successfully" )
4441 return nil
4542}
4643
@@ -99,23 +96,9 @@ func RunCity2TabulaDBSetup(config *config.Config, conn *pgxpool.Pool) error {
9996 return fmt .Errorf ("failed to setup main DB queue: %w" , err )
10097 }
10198
102- mainPipelineChan := make (chan * process.Pipeline , mainPipelineQueue .Len ())
103- for ! mainPipelineQueue .IsEmpty () {
104- pipeline := mainPipelineQueue .Dequeue ()
105- if pipeline != nil {
106- mainPipelineChan <- pipeline
107- }
108- }
109- close (mainPipelineChan )
110-
111- numWorkers := config .Batch .Threads
112- var wg sync.WaitGroup
113- for i := 1 ; i <= numWorkers ; i ++ {
114- wg .Add (1 )
115- worker := process .NewWorker (i )
116- go worker .Start (mainPipelineChan , conn , & wg , config )
99+ if err := process .RunPipelineQueue (mainPipelineQueue , conn , config ); err != nil {
100+ return fmt .Errorf ("main DB setup failed: %w" , err )
117101 }
118- wg .Wait ()
119102
120103 utils .Info .Println ("Main database setup completed" )
121104
@@ -125,21 +108,9 @@ func RunCity2TabulaDBSetup(config *config.Config, conn *pgxpool.Pool) error {
125108 return fmt .Errorf ("failed to setup supplementary DB queue: %w" , err )
126109 }
127110
128- supplementaryPipelineChan := make (chan * process.Pipeline , supplementaryPipelineQueue .Len ())
129- for ! supplementaryPipelineQueue .IsEmpty () {
130- pipeline := supplementaryPipelineQueue .Dequeue ()
131- if pipeline != nil {
132- supplementaryPipelineChan <- pipeline
133- }
134- }
135- close (supplementaryPipelineChan )
136-
137- for i := 1 ; i <= numWorkers ; i ++ {
138- wg .Add (1 )
139- worker := process .NewWorker (i )
140- go worker .Start (supplementaryPipelineChan , conn , & wg , config )
111+ if err := process .RunPipelineQueue (supplementaryPipelineQueue , conn , config ); err != nil {
112+ return fmt .Errorf ("supplementary DB setup failed: %w" , err )
141113 }
142- wg .Wait ()
143114
144115 utils .Info .Println ("Supplementary database setup completed" )
145116
0 commit comments