Skip to content

Commit dc828ef

Browse files
committed
support parallel column metrics computation
1 parent b8cfa85 commit dc828ef

9 files changed

Lines changed: 25 additions & 16 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ go.work.sum
2727
# env file
2828
.env
2929

30+
build

cmd/check.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ By automating these checks, you can proactively identify and address data qualit
109109
},
110110
}
111111

112-
cmd.Flags().StringVarP(&checksFile, "checks", "c", "", "Path to data quality checks file")
112+
cmd.Flags().StringVarP(&checksFile, "checks", "c", "", "path to data quality checks file")
113113
_ = cmd.MarkFlagRequired("checks")
114114

115115
return cmd

cmd/import.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ This command is useful for quickly onboarding data from external systems, allowi
7070
},
7171
}
7272

73-
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "Datasource from which datasets will be imported")
74-
cmd.Flags().StringVarP(&filter, "filter", "f", "", "Filter applied for dataset selection")
75-
cmd.Flags().BoolVarP(&updateCfg, "update-config", "u", false, "Update dbq config file in place")
73+
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "datasource from which datasets will be imported")
74+
cmd.Flags().StringVarP(&filter, "filter", "f", "", "filter applied for dataset selection")
75+
cmd.Flags().BoolVarP(&updateCfg, "update-config", "u", false, "update dbq config file in place")
7676

7777
return cmd
7878
}

cmd/ping.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ This is useful for quickly determining if the data source is online and respondi
5050
},
5151
}
5252

53-
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "Datasource to ping")
53+
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "datasource to ping")
5454

5555
return cmd
5656
}

cmd/profile.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,18 @@ import (
2020
"github.com/DataBridgeTech/dbqcore"
2121
"github.com/DataBridgeTech/dbqctl/internal"
2222
"github.com/spf13/cobra"
23+
"runtime"
2324
)
2425

26+
type ProfileResultOutput struct {
27+
Profiles map[string]*dbqcore.TableMetrics `json:"profiles"`
28+
}
29+
2530
func NewProfileCommand(app internal.DbqCliApp) *cobra.Command {
2631
var dataSource string
2732
var dataSet string
2833
var sample bool
34+
var maxConcurrent int
2935

3036
cmd := &cobra.Command{
3137
Use: "profile",
@@ -50,13 +56,13 @@ and helps in making better decisions about data processing and analysis.
5056
}
5157
}
5258

53-
profileResults := &dbqcore.ProfileResultOutput{
59+
profileResults := &ProfileResultOutput{
5460
Profiles: make(map[string]*dbqcore.TableMetrics),
5561
}
5662

5763
for _, curDataSet := range dataSetsToProfile {
58-
fmt.Printf("Profiling '%s', this may take some time...\n", curDataSet)
59-
metrics, err := app.ProfileDataset(dataSource, curDataSet, sample)
64+
fmt.Printf("Profiling '%s' (using %d jobs) , this may take some time...\n", curDataSet, maxConcurrent)
65+
metrics, err := app.ProfileDataset(dataSource, curDataSet, sample, maxConcurrent)
6066
if err != nil {
6167
fmt.Printf("Failed to profile %s: %s\n", curDataSet, err)
6268
} else {
@@ -76,11 +82,12 @@ and helps in making better decisions about data processing and analysis.
7682
},
7783
}
7884

79-
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "Datasource in which datasets will be profiled")
85+
cmd.Flags().StringVarP(&dataSource, "datasource", "d", "", "datasource in which datasets will be profiled")
8086
_ = cmd.MarkFlagRequired("datasource")
8187

82-
cmd.Flags().StringVarP(&dataSet, "dataset", "s", "", "Dataset within specified data source")
83-
cmd.Flags().BoolVarP(&sample, "sample", "m", false, "Include data samples in profiling report")
88+
cmd.Flags().StringVarP(&dataSet, "dataset", "s", "", "dataset within specified data source")
89+
cmd.Flags().BoolVarP(&sample, "sample", "m", false, "include data samples in profiling report")
90+
cmd.Flags().IntVarP(&maxConcurrent, "jobs", "j", runtime.NumCPU(), "set the maximum number of jobs to execute against the datasource during profiling. By default, this is equal to the number of CPUs on the host machine.")
8491

8592
return cmd
8693
}

cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,5 @@ func init() {
5252
// workaround for bootstrap config flag & unsupported flag issue
5353
var dbqConfigFile string
5454
rootCmd.PersistentFlags().StringVar(&dbqConfigFile, "config", "", "config file (default is $HOME/.dbq.yaml or ./dbq.yaml)")
55-
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "Enables verbose logging")
55+
rootCmd.PersistentFlags().BoolVarP(&verbose, "verbose", "v", false, "enables verbose logging")
5656
}

cmd/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
)
2323

2424
const (
25-
DbqCtlVersion = "v0.0.7"
25+
DbqCtlVersion = "v0.1.0"
2626
)
2727

2828
func NewVersionCommand() *cobra.Command {

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ github.com/ClickHouse/clickhouse-go/v2 v2.34.0 h1:Y4rqkdrRHgExvC4o/NTbLdY5LFQ3LH
44
github.com/ClickHouse/clickhouse-go/v2 v2.34.0/go.mod h1:yioSINoRLVZkLyDzdMXPLRIqhDvel8iLBlwh6Iefso8=
55
github.com/DataBridgeTech/dbqcore v0.0.2 h1:fvGqKOlphcvERnQl7cLIbm7IRhprGbY7la29U9a0fR8=
66
github.com/DataBridgeTech/dbqcore v0.0.2/go.mod h1:U2++eavpf8oSKxukSEJpKGfPaOtkApA7AUpzwkrXmVM=
7+
github.com/DataBridgeTech/dbqcore v0.0.3/go.mod h1:U2++eavpf8oSKxukSEJpKGfPaOtkApA7AUpzwkrXmVM=
78
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
89
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
910
github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=

internal/app.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import (
3030
type DbqCliApp interface {
3131
PingDataSource(srcId string) (string, error)
3232
ImportDatasets(srcId string, filter string) ([]string, error)
33-
ProfileDataset(srcId string, dataset string, sample bool) (*dbqcore.TableMetrics, error)
33+
ProfileDataset(srcId string, dataset string, sample bool, maxConcurrent int) (*dbqcore.TableMetrics, error)
3434
RunCheck(check *dbqcore.Check, dataSource *dbqcore.DataSource, dataset string, defaultWhere string) (bool, string, error)
3535
GetDbqConfig() *dbqcore.DbqConfig
3636
SaveDbqConfig() error
@@ -80,15 +80,15 @@ func (app *DbqAppImpl) ImportDatasets(srcId string, filter string) ([]string, er
8080
return cnn.ImportDatasets(filter)
8181
}
8282

83-
func (app *DbqAppImpl) ProfileDataset(srcId string, dataset string, sample bool) (*dbqcore.TableMetrics, error) {
83+
func (app *DbqAppImpl) ProfileDataset(srcId string, dataset string, sample bool, maxConcurrent int) (*dbqcore.TableMetrics, error) {
8484
var dataSource = app.FindDataSourceById(srcId)
8585

8686
cnn, err := getDbqConnector(*dataSource, app.logLevel)
8787
if err != nil {
8888
return nil, err
8989
}
9090

91-
return cnn.ProfileDataset(dataset, sample)
91+
return cnn.ProfileDataset(dataset, sample, maxConcurrent)
9292
}
9393

9494
func (app *DbqAppImpl) GetDbqConfig() *dbqcore.DbqConfig {

0 commit comments

Comments
 (0)